소켓 통신/Boost

Boost.asio I/O TCP 소켓 비동기적 읽기

마달랭 2024. 11. 14. 14:45

개요

Boost.asio 라이브러리가 제공하는 가장 기본적인 비동기 데이터 읽기 함수는 async_read_some() 메서드다.

이전 글들을 참고한다면 알 수 있겠지만 해당 메서드는 여타 some함수와 동일하다.

 

쓰기와 마찬가지로 버퍼, 콜백 함수 두 가지 인자를 받는다.

또한 데이터 읽기 작업 중 예외가 발생할 경우 콜백 함수를 호출한다.

만약 데이터를 모두 읽지 못했는데 TCP통신 중 모종의 이유로 중단될 경우 추가적으로 함수를 호출해 주어야 한다.

 

 

async_read_some()

이 메서드는 소켓의 데이터 중 일부를 읽어 버퍼에 쓰는 연산을 시작한다.

해당 비동기 연산이 오류 없이 끝난다면 적어도 한 바이트는 읽는다는 보장이 있다.

일반적으로 소켓의 데이터를 모두 읽으려면 이 비동기 메서드를 여러 번 호출해야 한다.

콜백 함수가 호출되었을 때 데이터의 크기와 읽은 바이트 수를 비교하여 동일 할때까지 콜백함수 내에서 재귀적으로 해당 비동기 메서드를 호출해 주는 방식으로 진행하면 된다.

 

async_read_some() 메서드의 과정

  1. 소켓 객체와 버퍼 및 데이터를 읽은 바이트 수를 저장할 변수를 포함하는 구조체를 정의한다.
  2. 비동기 읽기 연산 시 사용될 콜백 함수를 정의한다.
  3. 클라이언트에서 능동 TCP 소켓을 할당하고 서버에 연결한다.
  4. 서버가 연결 요청을 Accept하면 연결된 능동 TCP소켓을 공유할 수 있다.
  5. 읽을 데이터의 양을 담을 만큼 큰 버퍼를 할당한다.
  6. async_read_some()  메서드를 호출하여 비동기 읽기 연산을 시작한다.
  7. io_service 클래스 객체의 run() 메서드를 호출한다.
  8. 버퍼에 소켓의 데이터를 모두 읽어올 때 까지 반복적으로 비동기 메서드를 호출한다.
#include <boost/asio.hpp>
#include <iostream>

using namespace boost;

// 소켓 객체의 포인터, 버퍼, 읽은 바이트 수, 버퍼의 크기 변수 구조체 정의
struct Session {
	std::shared_ptr<asio::ip::tcp::socket> sock;
	std::unique_ptr<char[]> buf;
	std::size_t total_bytes_read;
	unsigned int buf_size;
};

// 비동기 읽기 연산의 콜백 함수, 첫번째와 두번째 인자는 고정
void callback(const boost::system::error_code& ec,
	std::size_t bytes_transferred,
	std::shared_ptr<Session> s)
{
	if (ec.value() != 0) {
		std::cout << "Error occured! Error code = "
			<< ec.value()
			<< ". Message: " << ec.message();

		return;
	}

	s->total_bytes_read += bytes_transferred;

	if (s->total_bytes_read == s->buf_size) {
		return;
	}

	s->sock->async_read_some(
		asio::buffer(
			s->buf.get() +
			s->total_bytes_read,
			s->buf_size -
			s->total_bytes_read),
		std::bind(callback, 
			std::placeholders::_1,
			std::placeholders::_2, 
			s));
}

void readFromSocket(std::shared_ptr<asio::ip::tcp::socket> sock) {
	std::shared_ptr<Session> s(new Session);

	const unsigned int MESSAGE_SIZE = 7;

	// Step 4. 버퍼 할당
	s->buf.reset(new char[MESSAGE_SIZE]);

	s->total_bytes_read = 0;
	s->sock = sock;
	s->buf_size = MESSAGE_SIZE;

	// Step 5. 비동기 읽기 연산 시작
	s->sock->async_read_some(
		asio::buffer(s->buf.get(), s->buf_size),
		std::bind(callback,
			std::placeholders::_1,
			std::placeholders::_2,
			s));
}

int main()
{
	std::string raw_ip_address = "127.0.0.1";
	unsigned short port_num = 3333;

	try {
		asio::ip::tcp::endpoint
			ep(asio::ip::address::from_string(raw_ip_address),
				port_num);

		asio::io_service ios;

		// Step 3. 소켓을 할당하고 연결시킨다.
		std::shared_ptr<asio::ip::tcp::socket> sock(
			new asio::ip::tcp::socket(ios, ep.protocol()));

		sock->connect(ep);

		readFromSocket(sock);

		// Step 6. 비동기 연산 및 콜백 함수 호출
		ios.run();
	}
	catch (system::system_error& e) {
		std::cout << "Error occured! Error code = " << e.code()
			<< ". Message: " << e.what();

		return e.code().value();
	}

	return 0;
}

 

main 함수에서 서버의 IP 주소와 포트 번호를 갖고 종료점을 만들어 준다.

ios_service와 종료점을 통해 소켓을 생성한 후에 서버에 연결 요청을 진행한다.

연결이 완료 되었을 경우 비동기 데이터 연산 작업을 호출한 후 ios.run()을 진행해 준다.

콜백함수 내부에 모든 데이터를 읽은 케이스가 아니라면 재귀적으로 async_read_some() 메서드를 호출한다.

 

여태 동기, 비동기 작업에서 사용한 some관련 메서드는 코드가 길고 명료하지 못하다.

비동기 읽기 작업 역시 더 간단하게 작업을 수행하는 메서드가 존재한다.

 

 

async_read()

asio::async_read() 함수는 모든 데이터를 읽을 때 까지 내부적으로 some함수를 계속 호출해 준다.

따라서 콜백 함수 내부에서 재귀적으로 계속 읽기 작업을 호출해 줄 필요가 없다.

그럼 당연하게도 몇 바이트 까지 데이터를 읽었는지 체크할 필요가 없다.

그냥 작업이 완료되었을 때와 예외가 발생했을 때의 작업을 정의해 주면 된다.

 

write때와 마찬가지로 some함수와 비교했을 때 1개 인자를 더 받게 된다.

소켓 객체를 참조하는 것인데 이는 첫번째 인자로 전달해 주면 된다, 나머지 인자는 동일하며 한칸씩 뒤로 미룬다.

 

#include <boost/asio.hpp>
#include <iostream>

using namespace boost;

// 몇 바이트까지 읽었는지 체크하는 변수를 생략할 수 있다.
struct Session {
	std::shared_ptr<asio::ip::tcp::socket> sock;
	std::unique_ptr<char[]> buf;
	unsigned int buf_size;
};

// 비동기 읽기 연산을 재귀적으로 실행할 필요가 없어졌다.
void callback(const boost::system::error_code& ec,
	std::size_t bytes_transferred,
	std::shared_ptr<Session> s)
{
	if (ec.value() != 0) {
		std::cout << "Error occured! Error code = "
			<< ec.value()
			<< ". Message: " << ec.message();

		return;
	}
}

void readFromSocket(std::shared_ptr<asio::ip::tcp::socket> sock) {
	std::shared_ptr<Session> s(new Session);

	const unsigned int MESSAGE_SIZE = 7;

	// Step 1. 버퍼 할당
	s->buf.reset(new char[MESSAGE_SIZE]);

	s->sock = sock;
	s->buf_size = MESSAGE_SIZE;

	// Step 2. 비동기 읽기 연산 시작
	asio::async_read(*s->sock,
		asio::buffer(s->buf.get(), s->buf_size),
		std::bind(callback,
			std::placeholders::_1,
			std::placeholders::_2,
			s));
}

int main()
{
	std::string raw_ip_address = "127.0.0.1";
	unsigned short port_num = 3333;

	try {
		asio::ip::tcp::endpoint
			ep(asio::ip::address::from_string(raw_ip_address),
				port_num);

		asio::io_service ios;

		std::shared_ptr<asio::ip::tcp::socket> sock(
			new asio::ip::tcp::socket(ios, ep.protocol()));

		sock->connect(ep);

		readFromSocket(sock);

		ios.run();
	}
	catch (system::system_error& e) {
		std::cout << "Error occured! Error code = " << e.code()
			<< ". Message: " << e.what();

		return e.code().value();
	}

	return 0;
}

 

콜백 함수 내부 로직의 코드가 훨신 간결해졌다!

 

728x90