네트워크 통신/Boost

Boost.asio I/O TCP 소켓 비동기적 쓰기

마달랭 2024. 11. 14. 14:12
반응형

개요

비동기적 쓰기는 원격 프로그램으로 데이터를 보내는 유연하면서도 효율적인 방법이다.

Boost.asio 라이브러리가 제공하는 가장 기본적인 비동기 데이터 쓰기 함수는 async_write_some() 메서드이다.

동기적 쓰기에 비해 앞에 async_가 붙은 것을 확인할 수 있다.

 

동기적 I/O와 유사하게 some()이 붙지 않은 더 편하게 사용할 수 있는 함수가 asio에 있지만, 해당 함수들 모두 결국에는 내부적으로 some()이 붙은 함수를 사용하기에 함수 원형을 먼저 공부하는게 도움이 될 것이다.

 

 

async_write_some()

Boost.asio 라이브러리가 제공하는 가장 기본적인 비동기 데이터 쓰기 함수이다.

이 메서드는 쓰기 연산을 시작한 후 곧바로 반환한다.

  • 첫 번째로 소켓에 쓸 데이터를 가진 버퍼를 받는다.
  • 두 번째로 콜백 함수를 받는다. (함수 포인터, 함수 객체, 그 밖의 WriteHandler 개념을 만족시키는 개체)

콜백 함수는 boost::systemp::error_code와 비동기 연산 중 몇 바이트를 소켓에 썼는지를 알리는 인자를 포함해야 한다.

write_some()과 마찬가지로 버퍼의 데이터 일부를 소켓에 쓰는 연산을 시작한다.

오류가 발생하지 않는다면 적어도 한 바이트는 씌여지며 일반적으로 버퍼에 있는 데이터 전체를 소켓에 쓰기 위해서는 해당 메서드를 반복적으로 호출해 주어야 한다.

 

  1. 소켓 객체, 버퍼, 몇 바이트나 썼는지 알릴 카운터 변수를 포함하는 구조체를 정의한다.
  2. 비동기 쓰기 연산이 끝났을 때 호출될 콜백 함수를 정의한다.
  3. 클라이언트 프로그램에서는 능동 TCP 소켓을 할당하고 서버에 연결 요청을 보낸다.
  4. 서버 프로그램에서는 연결 요청을 받아들여 연결된 능동 TCP 소켓을 얻는다.
  5. 버퍼를 할당하고 소켓에 쓸 데이터로 채운다.
  6. 소켓의 async_write_some() 메서드를 호출해 비동기 쓰기 연산을 시작한다.
  7. 이때 2번에서 정의한 콜백 함수를 인자로 전달한다.
  8. asio::io_service 클래스 객체의 run() 메서드를 호출한다.
  9. 콜백에서 쓰인 바이트 수에 대한 카운터를 증가시킨다.
  10. 만약 모든 데이터를 쓰지 못했다면 비동기 연산을 계속 시작하도록 함수를 추가 호출해 준다.
#include <boost/asio.hpp>
#include <iostream>

using namespace boost;

// 소켓 객체에 대한 포인터와 버퍼, 바이트 수를 저장하는 카운터 변수를 구조체로 정의
struct Session {
	std::shared_ptr<asio::ip::tcp::socket> sock;
	std::string buf;
	std::size_t total_bytes_written = 0;
};

// 비동기 쓰기 연산의 콜백함수
// 소켓에서 모든 데이터를 썼는지 확인하고, 필요하다면 다음 비동기 쓰기 연산 수행
void callback(const boost::system::error_code& ec,
	std::size_t bytes_transferred,
	std::shared_ptr<Session> s)
{
	if (ec.value() != 0) {
		std::cout << "Error occured! Error code = "
			<< ec.value() << ". Message: " << ec.message();
		return;
	}
	// 아직 소켓에 버퍼의 모든 데이터가 써진 상태가 아니라면?
	// 마지막으로 쓰기 연산이 완료된 위치에서 비동기 쓰기 연산을 다시 시작한다.
	s->total_bytes_written += bytes_transferred;

	if (s->total_bytes_written < s->buf.size()) {
		s->sock->async_write_some(
			asio::buffer(s->buf.c_str() + s->total_bytes_written,
				s->buf.size() - s->total_bytes_written),
			std::bind(callback,
				std::placeholders::_1,
				std::placeholders::_2,
				s));
	}
}

void writeToSocket(std::shared_ptr<asio::ip::tcp::socket> sock) {

	std::shared_ptr<Session> s(new Session);

	// Step 4. Session의 데이터를 초기화 한다.(버퍼, 소켓, 입력한 바이트 수)
	s->buf = std::string("Hello");
	s->sock = sock;

	// Step 5. 비동기 쓰기 연산을 시작한다.
	s->sock->async_write_some(
		asio::buffer(s->buf),
		std::bind(callback,
			std::placeholders::_1,
			std::placeholders::_2,
			s));
}

int main()
{
	std::string raw_ip_address = "127.0.0.1";
	unsigned short port_num = 3333;

	try {
		asio::ip::tcp::endpoint
			ep(asio::ip::address::from_string(raw_ip_address),
				port_num);

		asio::io_service ios;

		// Step 3. 소켓을 초기화 하고 소켓의 포인터를 초기화 한다.
		std::shared_ptr<asio::ip::tcp::socket> sock(
			new asio::ip::tcp::socket(ios, ep.protocol()));

		sock->connect(ep);

		writeToSocket(sock);

		// Step 6. 런!
		ios.run();
	}
	catch (system::system_error& e) {
		std::cout << "Error occured! Error code = " << e.code()
			<< ". Message: " << e.what();

		return e.code().value();
	}

	return 0;
}

 

해당 코드에서 async_write_some 메서드가 callback함수를 반환할때 placeholders::_1, _2가 있는 것을 볼 수 있다.

위에서 언급했듯 콜백 함수가 호출될때 첫번째 인자로 에러 코드를, 두번째 인자로 쓴 바이트 수를 무조건 매개변수로 포함하고 있어야 한다, 따라서 placeholders를 통해 명시해 주는 것으로 이해하면 된다.

 

 

async_write()

async_write_some() 메서드를 사용하면 소켓에 비동기적으로 데이터를 쓸 수 있지만, 코드 길이만 봐도 엄청 길고 각 로직이 매우 복잡해 진다.

좀 더 간단하게 비동기적으로 데이터를 쓸 수 있는 async_write()에 대하여 알아보자

해당 메서드는 버퍼에 있는 모든 데이터를 쓰는 연산 작업을 제공한다.

그리고 버퍼에 있는 모든 데이터를 소켓에 썼거나 오류가 발생했을 때에만 콜백이 호출된다.

그래서 소켓에 쓰는 작업이 간단해지고 코드도 짧고 명료해진다.

 

해당 메서드는 세가지 인자를 받는다.

기존의 some메서드는 socket클래스의 메서드였으나 write는 asio의 메서드이다.

따라서 소켓 정보를 함수의 첫번째 인자로 추가해 주어야 한다.

나머지 버퍼 및 콜백의 경우에는 기존과 동일하게 넣어주면 된다.

 

#include <boost/asio.hpp>
#include <iostream>

using namespace boost;

// 몇 바이트까지 썼는지 체크를 진행할 필요가 없다.
struct Session {
    std::shared_ptr<asio::ip::tcp::socket> sock;
    std::string buf;
};

// 콜백은 동일하다.
void callback(const boost::system::error_code& ec,
    std::size_t bytes_transferred,
    std::shared_ptr<Session> s)
{
    if (ec.value() != 0) {
        std::cout << "Error occurred! Error code = "
            << ec.value()
            << ". Message: " << ec.message();
        return;
    }

    // 쓰기 작업이 완료가 되었다면 출력
    std::cout << "Successfully written " << bytes_transferred << " bytes to the socket.\n";
}

void writeToSocket(std::shared_ptr<asio::ip::tcp::socket> sock) {

    std::shared_ptr<Session> s(new Session);

    s->buf = std::string("Hello");
    s->sock = sock;

    // Step 5. 비동기 쓰기 작업을 시작한다.
    asio::async_write(
        *s->sock,  // 소켓을 인자로 넣어주어야 한다.
        asio::buffer(s->buf),
        std::bind(callback,
            std::placeholders::_1,
            std::placeholders::_2,
            s));
}

int main()
{
    std::string raw_ip_address = "127.0.0.1";
    unsigned short port_num = 3333;

    try {
        asio::ip::tcp::endpoint ep(asio::ip::address::from_string(raw_ip_address), port_num);
        asio::io_service ios;

        std::shared_ptr<asio::ip::tcp::socket> sock(
            new asio::ip::tcp::socket(ios, ep.protocol()));

        sock->connect(ep);

        writeToSocket(sock);

        // Step 6. 비동기 처리 완료시 런
        ios.run();
    }
    catch (system::system_error& e) {
        std::cout << "Error occurred! Error code = " << e.code()
            << ". Message: " << e.what();
        return e.code().value();
    }

    return 0;
}

 

 

728x90
반응형