개요
클라이언트가 수백 만개의 요청을 만들 수 있고, 최대한 빠르게 처리하고 싶다면, 다중 스레드를 지원해야한다.
그러면 여러개의 스레드가 진짜로 동시에 여러 요청을 처리할 수 있다.
물론, 클라이언트가 실행되는 컴퓨터에 프로세서가 여러 개 있어야 한다.
만약, 컴퓨터에 설치된 프로세서의 수보다 더 많은 스레드를 동시에 실행시키면 스레드 교환에 따른 부하 때문에 어플리케이션의 속도가 오히려 떨어질 수도 있다.
단일 스레드와의 차이는 클래스 인스턴스를 생성할 때 생성자에 생성할 스레드의 개수를 전달한다.
해당 정수를 통해 전달한 크기 만큼의 스레드를 생성하여 작업을 병렬처리 할 수 있다.
I/O 연산을 하는 부분은 단일 스레드와 큰 차이가 없으므로 하기 글을 참고하길 바란다.
Boost.asio 비동기 단일 스레드 TCP 클라이언트
하기에서는 단일 스레드와 멀티 스레드의 차이점을 찾아보자
비동기 멀티 스레드 TCP 클라이언트
#include <boost/predef.h>
#ifdef BOOST_OS_WINDOWS
#define _WIN32_WINNT 0x0501
#if _WIN32_WINNT <= 0x0502
#define BOOST_ASIO_ENABLE_CANCELIO
#endif
#endif
#include <boost/asio.hpp>
#include <thread>
#include <mutex>
#include <memory>
#include <list>
#include <iostream>
#include <map>
using namespace boost;
typedef void(*Callback) (unsigned int request_id,
const std::string& response,
const system::error_code& ec);
struct Session {
Session(asio::io_context& ios,
const std::string& raw_ip_address,
unsigned short port_num,
const std::string& request,
unsigned int id,
Callback callback) :
m_sock(ios),
m_ep(asio::ip::address::from_string(raw_ip_address),
port_num),
m_request(request),
m_id(id),
m_callback(callback),
m_was_cancelled(false) {}
asio::ip::tcp::socket m_sock;
asio::ip::tcp::endpoint m_ep;
std::string m_request;
asio::streambuf m_response_buf;
std::string m_response;
system::error_code m_ec;
unsigned int m_id;
Callback m_callback;
bool m_was_cancelled;
std::mutex m_cancel_guard;
};
class AsyncTCPClient {
public:
AsyncTCPClient(unsigned char num_of_threads) {
m_work.reset(new boost::asio::io_context::work(m_ios));
for (unsigned char i = 1; i <= num_of_threads; i++) {
std::unique_ptr<std::thread> th(
new std::thread([this]() {
m_ios.run();
}));
m_threads.push_back(std::move(th));
}
}
void emulateLongComputationOp(
unsigned int duration_sec,
const std::string& raw_ip_address,
unsigned short port_num,
Callback callback,
unsigned int request_id) {
std::string request = "EMULATE_LONG_CALC_OP "
+ std::to_string(duration_sec)
+ "\n";
std::shared_ptr<Session> session =
std::shared_ptr<Session>(new Session(m_ios,
raw_ip_address,
port_num,
request,
request_id,
callback));
session->m_sock.open(session->m_ep.protocol());
std::unique_lock<std::mutex>
lock(m_active_sessions_guard);
m_active_sessions[request_id] = session;
lock.unlock();
session->m_sock.async_connect(session->m_ep,
[this, session](const system::error_code& ec) {
if (!ec) {
session->m_ec = ec;
onRequestComplete(session);
return;
}
std::unique_lock<std::mutex>
cancel_lock(session->m_cancel_guard);
if (session->m_was_cancelled) {
onRequestComplete(session);
return;
}
asio::async_write(session->m_sock,
asio::buffer(session->m_request),
[this, session](const boost::system::error_code& ec,
std::size_t bytes_transferred) {
if (!ec) {
session->m_ec = ec;
onRequestComplete(session);
return;
}
std::unique_lock<std::mutex>
cancel_lock(session->m_cancel_guard);
if (session->m_was_cancelled) {
onRequestComplete(session);
return;
}
asio::async_read_until(session->m_sock,
session->m_response_buf,
'\n',
[this, session](const boost::system::error_code& ec,
std::size_t bytes_transferred) {
if (!ec) {
session->m_ec = ec;
}
else {
std::istream strm(&session->m_response_buf);
std::getline(strm, session->m_response);
}
onRequestComplete(session);
});
});
});
}
void cancelRequest(unsigned int request_id) {
std::unique_lock<std::mutex>
lock(m_active_sessions_guard);
auto it = m_active_sessions.find(request_id);
if (it != m_active_sessions.end()) {
std::unique_lock<std::mutex>
cancel_lock(it->second->m_cancel_guard);
it->second->m_was_cancelled = true;
it->second->m_sock.cancel();
}
}
void close() {
m_work.reset(NULL);
for (auto& thread : m_threads) {
thread->join();
}
}
private:
void onRequestComplete(std::shared_ptr<Session> session) {
boost::system::error_code ignored_ec;
session->m_sock.shutdown(
asio::ip::tcp::socket::shutdown_both,
ignored_ec);
std::unique_lock<std::mutex>
lock(m_active_sessions_guard);
auto it = m_active_sessions.find(session->m_id);
if (it != m_active_sessions.end())
m_active_sessions.erase(it);
lock.unlock();
boost::system::error_code ec;
if (session->m_ec.value() == 0 && session->m_was_cancelled)
ec = asio::error::operation_aborted;
else
ec = session->m_ec;
session->m_callback(session->m_id,
session->m_response, ec);
}
private:
asio::io_context m_ios;
std::map<int, std::shared_ptr<Session>> m_active_sessions;
std::mutex m_active_sessions_guard;
std::unique_ptr<boost::asio::io_context::work> m_work;
std::list<std::unique_ptr<std::thread>> m_threads;
};
void handler(unsigned int request_id,
const std::string& response,
const system::error_code& ec)
{
if (!ec) {
std::cout << "Request #" << request_id
<< "가 완료되었습니다. 응답: "
<< response << std::endl;
}
else if (ec == asio::error::operation_aborted) {
std::cout << "Request #" << request_id
<< "가 사용자가 취소했습니다."
<< std::endl;
}
else {
std::cout << "Request #" << request_id
<< " 실패! 오류 코드 = " << ec.value()
<< ". 오류 메시지 = " << ec.message()
<< std::endl;
}
return;
}
int main()
{
try {
AsyncTCPClient client(4);
client.emulateLongComputationOp(10, "127.0.0.1", 3333,
handler, 1);
std::this_thread::sleep_for(std::chrono::seconds(5));
client.emulateLongComputationOp(11, "127.0.0.1", 3334,
handler, 2);
client.cancelRequest(1);
std::this_thread::sleep_for(std::chrono::seconds(6));
client.emulateLongComputationOp(12, "127.0.0.1", 3335,
handler, 3);
std::this_thread::sleep_for(std::chrono::seconds(15));
client.close();
}
catch (system::system_error& e) {
std::cout << "Error occured! Error code = " << e.code()
<< ". Message: " << e.what();
return e.code().value();
}
return 0;
}
단일 스레드 환경과 가장 큰 차이는 AsyncTCPClient 클래스의 인스턴스를 생성할 때 인자를 전달하는 점이다.
해당 인자는 AsyncTCPClient 클래스의 생성자에서 여러개의 스레드를 생성하는 작업을 하는데 쓰인다.
예제 처럼 4를 전달할 경우 클래스의 생성자에서는 4개의 스레드를 생성하여 각각 m_ios.run() 메서드를 실행한다.
따라서 각 스레드들은 m_ios에 할당된 비동기 작업들을 각각 대기 큐에서 꺼내어 처리하게 된다.
std::list<std::unique_ptr<std::thread>> m_threads;
m_threads가 list형태로 되어 있는 것을 볼 수 있다.
for (unsigned char i = 1; i <= num_of_threads; i++) {
std::unique_ptr<std::thread> th(
new std::thread([this]() {
m_ios.run();
}));
m_threads.push_back(std::move(th));
}
생성자의 인자로 받은 num_of_threads의 개수 만큼 스레드를 생성하고 run()을 하게 된다.
하지만 비동기 작업을 멀티 스레드 환경에서 처리한다고 해도 한 가지 작업에 모든 스레드가 달라붙어 일을 하는 것은 아니다.
예를 들어 async_write 메서드가 호출되어 비동기 작업이 필요할 때 마침 쉬고 있던 스레드1이 해당 작업을 도맡아 하고 있다면 하던 작업을 끝낸 스레드2가 해당 작업에 붙어서 스레드1을 도와 일을 할 수 있진 않다.
스레드2는 다른 네트워크 관련 처리라던지 또 다른 async_write와 같은 작업을 맡아 진행하게 된다.
현재 예제는 한개의 AsyncTCPClient 클래스의 인스턴스 안에서 4개의 스레드를 생성한 반면, 서버에 요청이 필요할 때 각각 스레드를 생성하고 AsyncTCPClient 클래스의 인스턴스를 독립적으로 생성하면 더 효율적으로 일을 처리할 수 있다.
'네트워크 통신 > Boost' 카테고리의 다른 글
Boost.asio 반복 동기 TCP 서버 (0) | 2024.11.18 |
---|---|
Boost.asio 서버 개요 (1) | 2024.11.18 |
Boost.asio 비동기 단일 스레드 TCP 클라이언트 (1) | 2024.11.18 |
Boost.asio 동기 UDP 클라이언트 (0) | 2024.11.18 |
Boost.asio 동기 TCP 클라이언트 (0) | 2024.11.18 |