개요
가장 간단한 비동기 클라이언트라 하더라도 동기 클라이언트에 비해 구조적으로 복잡하다.
비동기 클라이언트에 요청 취소와 같은 기능을 추가하려면 좀 더 복잡해진다.
비동기 TCP 클라이언트가 갖춰야 할 사항은 다음과 같다.
- 서버에서 오는 입력을 처리하는 스레드를 따로 둔다.(사용자 인터페이스 스레드)
- 위 스레드는 눈에 띄게 긴 시간 동안 멈추는 일이 없어야 한다.
- 사용자가 다양한 서버로 여러 요청을 보낼 수 있어야 한다.
- 사용자가 요청을 보낸 후 완료되기 전에 취소할 수 있어야 한다.
비동기 TCP 클라이언트 구현
#include <boost/predef.h> // OS 식별 도구
// Windows XP, Windows Server 2003 이하에서 I/O 작업 취소 기능을 활성화하기 위해 필요
// 자세한 내용은 "http://www.boost.org/doc/libs/1_58_0/
// doc/html/boost_asio/reference/basic_stream_socket/"
// cancel/overload1.html" 참조
#ifdef BOOST_OS_WINDOWS
#define _WIN32_WINNT 0x0501
#if _WIN32_WINNT <= 0x0502 // Windows Server 2003 이하
#define BOOST_ASIO_DISABLE_IOCP
#define BOOST_ASIO_ENABLE_CANCELIO
#endif
#endif
#include <boost/asio.hpp>
#include <thread>
#include <mutex>
#include <memory>
#include <iostream>
#include <map>
using namespace boost;
// 요청이 완료되었을 때 호출되는 콜백 함수 포인터 타입
typedef void(*Callback) (unsigned int request_id,
const std::string& response,
const system::error_code& ec);
// 하나의 요청에 대한 컨텍스트를 나타내는 구조체
struct Session {
Session(asio::io_context& ios,
const std::string& raw_ip_address,
unsigned short port_num,
const std::string& request,
unsigned int id,
Callback callback) :
m_sock(ios),
m_ep(asio::ip::address::from_string(raw_ip_address),
port_num),
m_request(request),
m_id(id),
m_callback(callback),
m_was_cancelled(false) {}
asio::ip::tcp::socket m_sock; // 통신에 사용되는 소켓
asio::ip::tcp::endpoint m_ep; // 원격 엔드포인트
std::string m_request; // 요청 문자열
// 응답을 저장할 streambuf
asio::streambuf m_response_buf;
std::string m_response; // 응답 문자열
// 요청 생애주기 중 오류가 발생하면 그 설명이 저장됨
system::error_code m_ec;
unsigned int m_id; // 요청에 할당된 고유 ID
// 요청이 완료되었을 때 호출되는 콜백 함수 포인터
Callback m_callback;
bool m_was_cancelled; // 요청 취소 여부
std::mutex m_cancel_guard; // 취소 시 동기화를 위한 mutex
};
class AsyncTCPClient {
public:
AsyncTCPClient() {
m_work.reset(new boost::asio::io_context::work(m_ios));
m_thread.reset(new std::thread([this]() {
m_ios.run();
}));
}
void emulateLongComputationOp(
unsigned int duration_sec,
const std::string& raw_ip_address,
unsigned short port_num,
Callback callback,
unsigned int request_id) {
// 요청 문자열 준비
std::string request = "EMULATE_LONG_CALC_OP "
+ std::to_string(duration_sec)
+ "\n";
std::shared_ptr<Session> session =
std::shared_ptr<Session>(new Session(m_ios,
raw_ip_address,
port_num,
request,
request_id,
callback));
session->m_sock.open(session->m_ep.protocol());
// 활성 세션 목록에 새 세션 추가
// 다중 스레드에서 접근할 수 있기 때문에 동기화 필요
std::unique_lock<std::mutex>
lock(m_active_sessions_guard);
m_active_sessions[request_id] = session;
lock.unlock();
session->m_sock.async_connect(session->m_ep,
[this, session](const system::error_code& ec)
{
if (!ec) {
session->m_ec = ec;
onRequestComplete(session);
return;
}
std::unique_lock<std::mutex>
cancel_lock(session->m_cancel_guard);
if (session->m_was_cancelled) {
onRequestComplete(session);
return;
}
asio::async_write(session->m_sock,
asio::buffer(session->m_request),
[this, session](const boost::system::error_code& ec,
std::size_t bytes_transferred)
{
if (!ec) {
session->m_ec = ec;
onRequestComplete(session);
return;
}
std::unique_lock<std::mutex>
cancel_lock(session->m_cancel_guard);
if (session->m_was_cancelled) {
onRequestComplete(session);
return;
}
asio::async_read_until(session->m_sock,
session->m_response_buf,
'\n',
[this, session](const boost::system::error_code& ec,
std::size_t bytes_transferred)
{
if (!ec) {
session->m_ec = ec;
}
else {
std::istream strm(&session->m_response_buf);
std::getline(strm, session->m_response);
}
onRequestComplete(session);
}); }); });
};
// 요청 취소
void cancelRequest(unsigned int request_id) {
std::unique_lock<std::mutex>
lock(m_active_sessions_guard);
auto it = m_active_sessions.find(request_id);
if (it != m_active_sessions.end()) {
std::unique_lock<std::mutex>
cancel_lock(it->second->m_cancel_guard);
it->second->m_was_cancelled = true;
it->second->m_sock.cancel();
}
}
void close() {
// 작업 객체를 파괴하여 I/O 스레드가 대기 상태에서 빠져나갈 수 있도록 함
m_work.reset(NULL);
// I/O 스레드 종료 대기
m_thread->join();
}
private:
void onRequestComplete(std::shared_ptr<Session> session) {
// 연결 종료 시도. 실패해도 오류 코드에 신경쓰지 않음
boost::system::error_code ignored_ec;
session->m_sock.shutdown(
asio::ip::tcp::socket::shutdown_both,
ignored_ec);
// 활성 세션 목록에서 세션 제거
std::unique_lock<std::mutex>
lock(m_active_sessions_guard);
auto it = m_active_sessions.find(session->m_id);
if (it != m_active_sessions.end())
m_active_sessions.erase(it);
lock.unlock();
boost::system::error_code ec;
if (session->m_ec.value() == 0 && session->m_was_cancelled)
ec = asio::error::operation_aborted;
else
ec = session->m_ec;
// 사용자 제공 콜백 호출
session->m_callback(session->m_id,
session->m_response, ec);
};
private:
asio::io_context m_ios;
std::map<int, std::shared_ptr<Session>> m_active_sessions;
std::mutex m_active_sessions_guard;
std::unique_ptr<boost::asio::io_context::work> m_work;
std::unique_ptr<std::thread> m_thread;
};
void handler(unsigned int request_id,
const std::string& response,
const system::error_code& ec)
{
if (!ec) {
std::cout << "Request #" << request_id
<< "가 완료되었습니다. 응답: "
<< response << std::endl;
}
else if (ec == asio::error::operation_aborted) {
std::cout << "Request #" << request_id
<< "가 사용자가 취소했습니다."
<< std::endl;
}
else {
std::cout << "Request #" << request_id
<< " 실패! 오류 코드 = " << ec.value()
<< ". 오류 메시지 = " << ec.message()
<< std::endl;
}
return;
}
int main()
{
try {
AsyncTCPClient client;
// 사용자가 요청을 시작함
// ID 1인 요청 시작
client.emulateLongComputationOp(10, "127.0.0.1", 3333,
handler, 1);
// 5초 동안 아무 작업도 하지 않음
std::this_thread::sleep_for(std::chrono::seconds(5));
// ID 2인 요청 시작
client.emulateLongComputationOp(11, "127.0.0.1", 3334,
handler, 2);
// ID 1인 요청을 취소
client.cancelRequest(1);
// 6초 동안 아무 작업도 하지 않음
std::this_thread::sleep_for(std::chrono::seconds(6));
// ID 3인 요청 시작
client.emulateLongComputationOp(9, "127.0.0.1", 3335,
handler, 3);
// 모든 요청이 완료될 때까지 대기
std::this_thread::sleep_for(std::chrono::seconds(30));
client.close();
}
catch (std::exception& e) {
std::cerr << "예외: " << e.what() << std::endl;
}
return 0;
}
코드의 라인만 봐도 동기 TCP 클라이언트 코드에 비해 확연히 차이가 나는 것을 알 수 있다.
모든 코드 블럭에 대해 세세히 알아보도록 하자
include, define
#include <boost/predef.h> // OS 식별 도구
// Windows XP, Windows Server 2003 이하에서 I/O 작업 취소 기능을 활성화하기 위해 필요
// 자세한 내용은 "http://www.boost.org/doc/libs/1_58_0/
// doc/html/boost_asio/reference/basic_stream_socket/"
// cancel/overload1.html" 참조
#ifdef BOOST_OS_WINDOWS
#define _WIN32_WINNT 0x0501
#if _WIN32_WINNT <= 0x0502 // Windows Server 2003 이하
#define BOOST_ASIO_DISABLE_IOCP
#define BOOST_ASIO_ENABLE_CANCELIO
#endif
#endif
#include <boost/asio.hpp>
#include <thread>
#include <mutex>
#include <memory>
#include <iostream>
#include <map>
using namespace boost;
이 블록은 윗 부분은 Windows OS에서 특정 기능을 활성화하기 위한 설정이다.
이후 Boost.Asio를 포함하여 네트워크 통신을 위한 필수 헤더들을 가져온다.
또한, std::thread, std::mutex, std::memory와 같은 C++ 표준 라이브러리 헤더도 포함하여 멀티스레딩과 메모리 관리를 할 수 있게 설정한다.
Callback
typedef void(*Callback) (unsigned int request_id,
const std::string& response,
const system::error_code& ec);
typedef를 통해 Callback이라는 타입을 정의한다.
Callback은 비동기 작업이 완료되었을 때 호출되는 함수의 포인터로, 요청 ID, 응답 문자열, 그리고 오류 코드를 인자로 받는다.
Session
struct Session {
Session(asio::io_context& ios,
const std::string& raw_ip_address,
unsigned short port_num,
const std::string& request,
unsigned int id,
Callback callback) :
m_sock(ios),
m_ep(asio::ip::address::from_string(raw_ip_address),
port_num),
m_request(request),
m_id(id),
m_callback(callback),
m_was_cancelled(false) {}
세선 구조체는 각 요청에 대한 정보를 포함한다.
이 구조체는 요청에 대한 TCP 연결 정보, 요청 문자열, 요청 ID, 콜백 함수, 오류 코드 등을 저장한다.
asio::io_context를 통해 비동기 I/O 작업을 수행할 소켓을 설정한다.
asio::ip::tcp::socket m_sock; // 통신에 사용되는 소켓
asio::ip::tcp::endpoint m_ep; // 원격 엔드포인트
std::string m_request; // 요청 문자열
// 응답을 저장할 streambuf
asio::streambuf m_response_buf;
std::string m_response; // 응답 문자열
// 요청 생애주기 중 오류가 발생하면 그 설명이 저장됨
system::error_code m_ec;
unsigned int m_id; // 요청에 할당된 고유 ID
// 요청이 완료되었을 때 호출되는 콜백 함수 포인터
Callback m_callback;
bool m_was_cancelled; // 요청 취소 여부
std::mutex m_cancel_guard; // 취소 시 동기화를 위한 mutex
};
주석에 명시되어 있는 것 처럼 각 세션 마다 통신에 사용할 소켓, 종료점, 요청 버퍼, 응답 버퍼, 에러 코드, 요청을 구분하기 위한 ID, 콜백 함수 매핑, 요청 취소 여부와 스레드 관련 mutex를 초기화 한다.
AsyncTCPClient 클래스 생성자
class AsyncTCPClient {
public:
AsyncTCPClient() {
m_work.reset(new boost::asio::io_context::work(m_ios));
m_thread.reset(new std::thread([this]() {
m_ios.run();
}));
}
AsyncTCPClient 클래스의 인스턴스가 생성되었을 때 초기 생성자를 실행한다.
private:
asio::io_context m_ios;
std::map<int, std::shared_ptr<Session>> m_active_sessions;
std::mutex m_active_sessions_guard;
std::unique_ptr<boost::asio::io_context::work> m_work;
std::unique_ptr<std::thread> m_thread;
};
인스턴스 내에서 사용할 목록은 다음과 같다.
하나의 인스턴스 내에서 고유한 ID의 작업이 실행되니 이를 맵으로 관리해 준다.
또한 다른 작업이 진행되는 중일때 서로의 영역을 침범하지 않도록 mutex를 초기화 해준다.
m_work를 통해 io_context가 종료되지 않도록 유지하는 역할을 한다, io_context는 비동기 작업이 끝날 때까지 계속 실행되어야 하므로, m_work가 존재하는 동안에는 io_context가 종료되지 않는다.
m_thread는 io_context의 run 메서드를 실행하는 별도의 스레드를 관리한다.
m_ios.run()은 비동기 작업을 처리하는 주요 루프이므로, 이를 별도의 스레드에서 실행하여 프로그램이 계속해서 비동기 작업을 처리하도록한다.
AsyncTCPClient 클래스 emulateLongComputationOp 메서드
void emulateLongComputationOp(
unsigned int duration_sec,
const std::string& raw_ip_address,
unsigned short port_num,
Callback callback,
unsigned int request_id) {
// 요청 문자열 준비
std::string request = "EMULATE_LONG_CALC_OP "
+ std::to_string(duration_sec)
+ "\n";
std::shared_ptr<Session> session =
std::shared_ptr<Session>(new Session(m_ios,
raw_ip_address,
port_num,
request,
request_id,
callback));
session->m_sock.open(session->m_ep.protocol());
// 활성 세션 목록에 새 세션 추가
// 다중 스레드에서 접근할 수 있기 때문에 동기화 필요
std::unique_lock<std::mutex>
lock(m_active_sessions_guard);
m_active_sessions[request_id] = session;
lock.unlock();
session->m_sock.async_connect(session->m_ep,
[this, session](const system::error_code& ec)
{
if (!ec) {
session->m_ec = ec;
onRequestComplete(session);
return;
}
emulateLongComputationOp 함수는 작업이 실행될 기간, ip주소, 포트 번호, 콜백 함수, 요청 번호를 매개변수로 받는다.
서버로 요청할 버퍼를 문자열 형식으로 준비하고, ip주소와 포트 번호, 요청 문자열, 요청 id와 콜백을 Session타입의 구조체 session변수에 전달하여 소켓, 종료점 등 나머지 정보를 new를 통해 동적 할당 한다.
소켓을 열고 mutex를 통해 다른 스레드에서 접근하지 못하도록 동기화를 진행한다.
map타입의 자료구조 m_active_sessions의 key로 요청id를 사용하고 value로 session을 할당한다.
뮤텍스 락을 풀어주고 주어진 IP 주소와 포트로 비동기 연결을 시도하며, 연결이 성공하면 비동기 작업을 시작할 수 있다.
std::unique_lock<std::mutex>
cancel_lock(session->m_cancel_guard);
if (session->m_was_cancelled) {
onRequestComplete(session);
return;
}
asio::async_write(session->m_sock,
asio::buffer(session->m_request),
[this, session](const boost::system::error_code& ec,
std::size_t bytes_transferred)
{
if (!ec) {
session->m_ec = ec;
onRequestComplete(session);
return;
}
std::unique_lock<std::mutex>
cancel_lock(session->m_cancel_guard);
if (session->m_was_cancelled) {
onRequestComplete(session);
return;
}
asio::async_read_until(session->m_sock,
session->m_response_buf,
'\n',
[this, session](const boost::system::error_code& ec,
std::size_t bytes_transferred)
{
if (!ec) {
session->m_ec = ec;
}
else {
std::istream strm(&session->m_response_buf);
std::getline(strm, session->m_response);
}
onRequestComplete(session);
}); }); });
};
비동기 연결이 성공한 이후에 대한 로직이다.
cancel_lock 메서드를 통해 현재 세션의 뮤텍스에 락을 건다.
이후 현재 세션의 m_was_cancelled를 참조하여 true라면 onRequestComplete함수를 호출하고 리턴한다.
이는 작업 중에 해당 작업에 대한 취소 요청이 왔는지 확인하는 과정이다.
뮤텍스 락을 거는 이유는 다중 스레드 환경에서 다른 스레드가 현재 세션에 개입하지 못하도록 하기 위함이다.
async_wirte 메서드를 통해 현재 세션의 소켓에 요청 내용을 버퍼로 변환하여 비동기 쓰기 작업을 시작한다.
3번째 인자로 콜백함수가 필요한데 해당 부분은 람다 형식의 콜백함수가 지정되었다.
람다 함수의 매개변수는 에러코드와 쓰기에 성공한 바이트의 수를 입력받는다.
만약 에러 코드가 존재하지 않다면 버퍼의 모든 데이터를 쓰기 완료하였으므로 onRequestComplete 메서드를 실행한다.
아니라면 아직 버퍼에 모든 데이터를 쓰지 못했는데 콜백 함수가 호출된 경우이다.
뮤텍스 락을 걸고 쓰기 작업 중 취소 요청이 있었는지를 체크해 준다. 요청이 왔다면 작업 도중에 빠져나가면 된다.
아니라면 버퍼의 데이터를 소켓에 모두 쓸 때까지 쓰기 연산을 계속 진행해 준다.
모든 쓰기작업이 완료되었다면 이제 읽기 작업을 비동기 형식으로 실행해 준다.
매개변수로 현재 세션의 소켓과 응답을 받기 위한 가변 버퍼를, read_until을 사용해 줄 것이므로 구분 문자와 마지막으로 콜백 함수를 람다 형식으로 받는다.
람다 함수의 매개변수는 write와 동일하게 에러코드와 읽은 바이트 수를 나타낼 변수이다.
에러가 발생했다면 현재 세션의 에러 코드 m_ec에 에러 정보를 지정해 준다.
만약 에러가 나지 않았다면 입력 받은 버퍼를 인풋 스트림 형식으로 저장해 준다.
이후 getline을 통해 현재 세션의 응답 문자열인 m_response에 응답 내용을 저장해 준다.
현재는 read_until을 사용해 주었고, 구분자가 '\n'이므로 getline의 3번째 인자를 생략한 것이다.
모든 읽기 작업이 완료되었다면 onRequestComplete 함수를 호출해 준다.
void cancelRequest(unsigned int request_id) {
std::unique_lock<std::mutex>
lock(m_active_sessions_guard);
auto it = m_active_sessions.find(request_id);
if (it != m_active_sessions.end()) {
std::unique_lock<std::mutex>
cancel_lock(it->second->m_cancel_guard);
it->second->m_was_cancelled = true;
it->second->m_sock.cancel();
}
}
AsyncTCPClient 클래스 cancelRequest메서드
void cancelRequest(unsigned int request_id) {
std::unique_lock<std::mutex>
lock(m_active_sessions_guard);
auto it = m_active_sessions.find(request_id);
if (it != m_active_sessions.end()) {
std::unique_lock<std::mutex>
cancel_lock(it->second->m_cancel_guard);
it->second->m_was_cancelled = true;
it->second->m_sock.cancel();
}
}
진행 중이던 통신에 대한 요청 취소가 요청되었을 때 실행되는 함수이다.
매개변수로 연산을 진행 중이던 id를 입력받는다.
진행 되고 있는 스레드에 대하여 락을 건다.
맵 자료구조 m_active_sessions에서 취소 요청을 받은 작업 id의 이터레이터를 찾아준다.
만약 해당 작업을 찾았다면 해당 작업의 스레드에 락을 건다.
해당 작업의 m_was_cancelled가 true라면 해당 작업의 소켓에 cancel메서드를 실행한다.
AsyncTCPClient 클래스 close메서드
void close() {
// 작업 객체를 파괴하여 I/O 스레드가 대기 상태에서 빠져나갈 수 있도록 함
m_work.reset(NULL);
// I/O 스레드 종료 대기
m_thread->join();
}
모든 비동기 연산을 마치고 통신을 닫기 위한 함수
클래스의 생성자에서 초기화 했던 m_work, m_thread의 상태를 사용하지 않도록 지정한다.
AsyncTCPClient 클래스 onRequestComplete메서드
void onRequestComplete(std::shared_ptr<Session> session) {
// 연결 종료 시도. 실패해도 오류 코드에 신경쓰지 않음
boost::system::error_code ignored_ec;
session->m_sock.shutdown(
asio::ip::tcp::socket::shutdown_both,
ignored_ec);
// 활성 세션 목록에서 세션 제거
std::unique_lock<std::mutex>
lock(m_active_sessions_guard);
auto it = m_active_sessions.find(session->m_id);
if (it != m_active_sessions.end())
m_active_sessions.erase(it);
lock.unlock();
boost::system::error_code ec;
if (session->m_ec.value() == 0 && session->m_was_cancelled)
ec = asio::error::operation_aborted;
else
ec = session->m_ec;
// 사용자 제공 콜백 호출
session->m_callback(session->m_id,
session->m_response, ec);
};
작업이 완료되었음을 전달 받으면 실행되는 함수
매개변수로 세션의 정보를 입력 받는다.
해당 세션의 소켓을 shutdown 처리하여 소켓을 종료 상태로 만든다.
스레드에 락을 걸고 맵 상에서 해당 작업이 존재하는지를 체크하고, erase처리해 준다.
스레드 락을 풀고 만약 세션에 지정된 에러 코드가 존재하거나 취소 요청이 와서 종료된 작업이라면 에러 코드를 abort로 지정해 준다, 만약 앞의 조건에 해당하지 않는다면 그대로 세션의 에러코드 정보를 저장한다.
콜백 함수를 통해 현재 요청 id와 응답 정보, 에러코드를 사용자에게 제공한다.
handler
void handler(unsigned int request_id,
const std::string& response,
const system::error_code& ec)
{
if (!ec) {
std::cout << "Request #" << request_id
<< "가 완료되었습니다. 응답: "
<< response << std::endl;
}
else if (ec == asio::error::operation_aborted) {
std::cout << "Request #" << request_id
<< "가 사용자가 취소했습니다."
<< std::endl;
}
else {
std::cout << "Request #" << request_id
<< " 실패! 오류 코드 = " << ec.value()
<< ". 오류 메시지 = " << ec.message()
<< std::endl;
}
return;
}
비동기 통신 중 콜백 함수로 사용하기 위한 핸들러 함수
세션의 콜백 함수를 통해 전달받은 인자로 사용자에게 작업의 결과를 알리기 위한 함수이다.
emulateLongComputationOp 메서드에서 전달받은 세션의 콜백 인자를 받는다.
각 에러코드에 맞춰 통신의 결과를 사용자에게 출력해준다.
main
int main()
{
try {
AsyncTCPClient client;
// 사용자가 요청을 시작함
// ID 1인 요청 시작
client.emulateLongComputationOp(10, "127.0.0.1", 3333,
handler, 1);
// 5초 동안 아무 작업도 하지 않음
std::this_thread::sleep_for(std::chrono::seconds(5));
// ID 2인 요청 시작
client.emulateLongComputationOp(11, "127.0.0.1", 3334,
handler, 2);
// ID 1인 요청을 취소
client.cancelRequest(1);
// 6초 동안 아무 작업도 하지 않음
std::this_thread::sleep_for(std::chrono::seconds(6));
// ID 3인 요청 시작
client.emulateLongComputationOp(9, "127.0.0.1", 3335,
handler, 3);
// 모든 요청이 완료될 때까지 대기
std::this_thread::sleep_for(std::chrono::seconds(30));
client.close();
}
catch (std::exception& e) {
std::cerr << "예외: " << e.what() << std::endl;
}
return 0;
}
AsyncTCPClient 클래스의 인스턴스 client를 초기화 한다.
해당 인스턴스를 통해 emulateLongComputationOp 메서드를 호출한다.
메서드의 인자로는 작업을 진행할 시간, ip주소, 포트 번호, 콜백 함수(핸들러), 요청id를 전달한다.
각 요청에 대한 결과는 handler함수를 통해 출력하여 사용자에게 제공된다.
해당 함수에서는 마치 사용자가 세 가지 요청을 시작하도록 한 후, 그 중 하나를 취소하는 것처럼 동작한다.
해당 예제는 하나의 클라이언트 객체에서 스레드를 생성하고 작업을 비동기로 처리하기에 결국 단일 스레드로 실행한다.
만약 멀티 스레드 환경에서 각 작업에 대해 처리를 하고 싶다면 멀티 스레드 기능을 사용해야 한다.
'네트워크 통신 > Boost' 카테고리의 다른 글
Boost.asio 서버 개요 (1) | 2024.11.18 |
---|---|
Boost.asio 비동기 멀티 스레드 TCP 클라이언트 (0) | 2024.11.18 |
Boost.asio 동기 UDP 클라이언트 (0) | 2024.11.18 |
Boost.asio 동기 TCP 클라이언트 (0) | 2024.11.18 |
Boost.asio 클라이언트 개요 (1) | 2024.11.14 |