네트워크 통신/Boost

Boost.asio 비동기 멀티 스레드 TCP 클라이언트

마달랭 2024. 11. 18. 14:22
반응형

개요

클라이언트가 수백 만개의 요청을 만들 수 있고, 최대한 빠르게 처리하고 싶다면, 다중 스레드를 지원해야한다.

그러면 여러개의 스레드가 진짜로 동시에 여러 요청을 처리할 수 있다.

물론, 클라이언트가 실행되는 컴퓨터에 프로세서가 여러 개 있어야 한다.

만약, 컴퓨터에 설치된 프로세서의 수보다 더 많은 스레드를 동시에 실행시키면 스레드 교환에 따른 부하 때문에 어플리케이션의 속도가 오히려 떨어질 수도 있다.

 

단일 스레드와의 차이는 클래스 인스턴스를 생성할 때 생성자에 생성할 스레드의 개수를 전달한다.

해당 정수를 통해 전달한 크기 만큼의 스레드를 생성하여 작업을 병렬처리 할 수 있다.

I/O 연산을 하는 부분은 단일 스레드와 큰 차이가 없으므로 하기 글을 참고하길 바란다.

 

Boost.asio 비동기 단일 스레드 TCP 클라이언트

 

Boost.asio 비동기 단일 스레드 TCP 클라이언트

개요가장 간단한 비동기 클라이언트라 하더라도 동기 클라이언트에 비해 구조적으로 복잡하다.비동기 클라이언트에 요청 취소와 같은 기능을 추가하려면 좀 더 복잡해진다.비동기 TCP 클라이언

zzzz955.tistory.com

 

하기에서는 단일 스레드와 멀티 스레드의 차이점을 찾아보자

 

 

비동기 멀티 스레드 TCP 클라이언트

#include <boost/predef.h> 
#ifdef BOOST_OS_WINDOWS
#define _WIN32_WINNT 0x0501

#if _WIN32_WINNT <= 0x0502 
#define BOOST_ASIO_ENABLE_CANCELIO
#endif
#endif

#include <boost/asio.hpp>

#include <thread>
#include <mutex>
#include <memory>
#include <list>
#include <iostream>
#include <map>

using namespace boost;

typedef void(*Callback) (unsigned int request_id,
    const std::string& response,
    const system::error_code& ec);

struct Session {
    Session(asio::io_context& ios,
        const std::string& raw_ip_address,
        unsigned short port_num,
        const std::string& request,
        unsigned int id,
        Callback callback) :
        m_sock(ios),
        m_ep(asio::ip::address::from_string(raw_ip_address),
            port_num),
        m_request(request),
        m_id(id),
        m_callback(callback),
        m_was_cancelled(false) {}

    asio::ip::tcp::socket m_sock; 
    asio::ip::tcp::endpoint m_ep; 
    std::string m_request;        
    asio::streambuf m_response_buf;
    std::string m_response; 
    system::error_code m_ec;

    unsigned int m_id; 
    Callback m_callback;

    bool m_was_cancelled;
    std::mutex m_cancel_guard;
};

class AsyncTCPClient {
public:
    AsyncTCPClient(unsigned char num_of_threads) {

        m_work.reset(new boost::asio::io_context::work(m_ios));

        for (unsigned char i = 1; i <= num_of_threads; i++) {
            std::unique_ptr<std::thread> th(
                new std::thread([this]() {
                    m_ios.run();
                    }));

            m_threads.push_back(std::move(th));
        }
    }

    void emulateLongComputationOp(
        unsigned int duration_sec,
        const std::string& raw_ip_address,
        unsigned short port_num,
        Callback callback,
        unsigned int request_id) {

        std::string request = "EMULATE_LONG_CALC_OP "
            + std::to_string(duration_sec)
            + "\n";

        std::shared_ptr<Session> session =
            std::shared_ptr<Session>(new Session(m_ios,
                raw_ip_address,
                port_num,
                request,
                request_id,
                callback));

        session->m_sock.open(session->m_ep.protocol());

        std::unique_lock<std::mutex>
            lock(m_active_sessions_guard);
        m_active_sessions[request_id] = session;
        lock.unlock();

        session->m_sock.async_connect(session->m_ep,
            [this, session](const system::error_code& ec) {
                if (!ec) {
                    session->m_ec = ec;
                    onRequestComplete(session);
                    return;
                }

                std::unique_lock<std::mutex>
                    cancel_lock(session->m_cancel_guard);

                if (session->m_was_cancelled) {
                    onRequestComplete(session);
                    return;
                }

                asio::async_write(session->m_sock,
                    asio::buffer(session->m_request),
                    [this, session](const boost::system::error_code& ec,
                        std::size_t bytes_transferred) {
                            if (!ec) {
                                session->m_ec = ec;
                                onRequestComplete(session);
                                return;
                            }

                            std::unique_lock<std::mutex>
                                cancel_lock(session->m_cancel_guard);

                            if (session->m_was_cancelled) {
                                onRequestComplete(session);
                                return;
                            }

                            asio::async_read_until(session->m_sock,
                                session->m_response_buf,
                                '\n',
                                [this, session](const boost::system::error_code& ec,
                                    std::size_t bytes_transferred) {
                                        if (!ec) {
                                            session->m_ec = ec;
                                        }
                                        else {
                                            std::istream strm(&session->m_response_buf);
                                            std::getline(strm, session->m_response);
                                        }

                                        onRequestComplete(session);
                                });
                    });
            });
    }

    void cancelRequest(unsigned int request_id) {
        std::unique_lock<std::mutex>
            lock(m_active_sessions_guard);

        auto it = m_active_sessions.find(request_id);
        if (it != m_active_sessions.end()) {
            std::unique_lock<std::mutex>
                cancel_lock(it->second->m_cancel_guard);

            it->second->m_was_cancelled = true;
            it->second->m_sock.cancel();
        }
    }

    void close() {
        m_work.reset(NULL);

        for (auto& thread : m_threads) {
            thread->join();
        }
    }

private:
    void onRequestComplete(std::shared_ptr<Session> session) {
        boost::system::error_code ignored_ec;

        session->m_sock.shutdown(
            asio::ip::tcp::socket::shutdown_both,
            ignored_ec);

        std::unique_lock<std::mutex>
            lock(m_active_sessions_guard);

        auto it = m_active_sessions.find(session->m_id);
        if (it != m_active_sessions.end())
            m_active_sessions.erase(it);

        lock.unlock();

        boost::system::error_code ec;

        if (session->m_ec.value() == 0 && session->m_was_cancelled)
            ec = asio::error::operation_aborted;
        else
            ec = session->m_ec;

        session->m_callback(session->m_id,
            session->m_response, ec);
    }

private:
    asio::io_context m_ios;
    std::map<int, std::shared_ptr<Session>> m_active_sessions;
    std::mutex m_active_sessions_guard;
    std::unique_ptr<boost::asio::io_context::work> m_work;
    std::list<std::unique_ptr<std::thread>> m_threads;
};

void handler(unsigned int request_id,
    const std::string& response,
    const system::error_code& ec)
{
    if (!ec) {
        std::cout << "Request #" << request_id
            << "가 완료되었습니다. 응답: "
            << response << std::endl;
    }
    else if (ec == asio::error::operation_aborted) {
        std::cout << "Request #" << request_id
            << "가 사용자가 취소했습니다."
            << std::endl;
    }
    else {
        std::cout << "Request #" << request_id
            << " 실패! 오류 코드 = " << ec.value()
            << ". 오류 메시지 = " << ec.message()
            << std::endl;
    }

    return;
}

int main()
{
    try {
        AsyncTCPClient client(4);

        client.emulateLongComputationOp(10, "127.0.0.1", 3333,
            handler, 1);
        std::this_thread::sleep_for(std::chrono::seconds(5));

        client.emulateLongComputationOp(11, "127.0.0.1", 3334,
            handler, 2);

        client.cancelRequest(1);
        std::this_thread::sleep_for(std::chrono::seconds(6));

        client.emulateLongComputationOp(12, "127.0.0.1", 3335,
            handler, 3);

        std::this_thread::sleep_for(std::chrono::seconds(15));

        client.close();
    }
    catch (system::system_error& e) {
        std::cout << "Error occured! Error code = " << e.code()
            << ". Message: " << e.what();

        return e.code().value();
    }

    return 0;
}

 

단일 스레드 환경과 가장 큰 차이는 AsyncTCPClient 클래스의 인스턴스를 생성할 때 인자를 전달하는 점이다.

해당 인자는 AsyncTCPClient 클래스의 생성자에서 여러개의 스레드를 생성하는 작업을 하는데 쓰인다.

예제 처럼 4를 전달할 경우 클래스의 생성자에서는 4개의 스레드를 생성하여 각각 m_ios.run() 메서드를 실행한다.

따라서 각 스레드들은 m_ios에 할당된 비동기 작업들을 각각 대기 큐에서 꺼내어 처리하게 된다.

 

std::list<std::unique_ptr<std::thread>> m_threads;

 

m_threads가 list형태로 되어 있는 것을 볼 수 있다.

 

for (unsigned char i = 1; i <= num_of_threads; i++) {
    std::unique_ptr<std::thread> th(
        new std::thread([this]() {
            m_ios.run();
            }));

    m_threads.push_back(std::move(th));
}

 

생성자의 인자로 받은 num_of_threads의 개수 만큼 스레드를 생성하고 run()을 하게 된다.

 

하지만 비동기 작업을 멀티 스레드 환경에서 처리한다고 해도 한 가지 작업에 모든 스레드가 달라붙어 일을 하는 것은 아니다.

예를 들어 async_write 메서드가 호출되어 비동기 작업이 필요할 때 마침 쉬고 있던 스레드1이 해당 작업을 도맡아 하고 있다면 하던 작업을 끝낸 스레드2가 해당 작업에 붙어서 스레드1을 도와 일을 할 수 있진 않다.

스레드2는 다른 네트워크 관련 처리라던지 또 다른 async_write와 같은 작업을 맡아 진행하게 된다.

 

현재 예제는 한개의 AsyncTCPClient 클래스의 인스턴스 안에서 4개의 스레드를 생성한 반면, 서버에 요청이 필요할 때 각각 스레드를 생성하고 AsyncTCPClient 클래스의 인스턴스를 독립적으로 생성하면 더 효율적으로 일을 처리할 수 있다.

 

728x90
반응형