네트워크 통신/Boost

Boost.asio 비동기 TCP 서버

마달랭 2024. 11. 20. 22:51
반응형

개요

비동기 TCP 서버는 다음과 같은 사항을 만족시키는 분산 어플리케이션이다.

  • 클라이언트 서버 통신 모델에서 서버로 동작한다.
  • TCP 프로토콜을 사용해 클라이언트 프로그램과 통신한다.
  • 비동기 I/O 및 제어 연산을 한다.
  • 한 번에 여러 클라이언트를 처리할 수 있다.

일반적인 비동기 TCP 서버는 다음과 같은 알고리즘에 따라 동작한다.

  1. 수용자 소켓을 할당하고 특정 TCP 포트에 묶는다.
  2. 비동기 수용 연산을 시작하도록 한다.
  3. 하나 이상의 스레드를 만들어 Boost.asio 이벤트 루프를 실행시키는 스레드 풀에 추가한다.
  4. 비동기 수용 연산이 끝나면, 다음 연결 요청을 받아들이는 새로운 연산을 시작하도록 한다.
  5. 비동기 읽기 연산을 시작시켜 연결된 클라이언트로부터 들어오는 요청을 읽는다.
  6. 비동기 읽기 연산이 끝나면, 요청을 처리하고 응답 메시지를 준비한다.
  7. 응답 메시지를 클라이언트로 보낼 비동기 쓰기 연산을 시작하도록 한다.
  8. 비동기 쓰기 연산이 끝나면, 연결을 닫고 소켓을 할당 해지한다.

시작부터 4번 까지는 실제 비동기 연산이 실행되는 시간에 따라 순서가 뒤바뀔 수 있다.

서버의 컴퓨터에 프로세서가 하나뿐이라 해도 서버가 비동기 모델을 사용하기 때문에 실행되는 순서는 위와 같지 않을 수 있다.

 

 

비동기 TCP 서버 구현

#include <boost/asio.hpp>
#include <thread>
#include <memory>
#include <iostream>
#include <vector>

using namespace boost;

class Service {
public:
    Service(std::shared_ptr<asio::ip::tcp::socket> sock)
        : m_sock(sock) {}

    // 요청을 비동기적으로 읽어들이고 처리 시작
    void StartHandling() {
        // 'shared_ptr'을 비동기 콜백에 전달
        auto self = shared_ptr<Service>(this);  // 'this'를 shared_ptr로 감싸서 전달

        asio::async_read_until(*m_sock, m_request, '\n',
            [self](const system::error_code& ec, std::size_t bytes_transferred) {
                self->onRequestReceived(ec, bytes_transferred);
            });
    }

private:
    // 요청을 받은 후 처리하는 함수
    void onRequestReceived(const system::error_code& ec, std::size_t bytes_transferred) {
        if (ec) {
            std::cerr << "Error occurred! Error code = "
                << ec.value() << ". Message: " << ec.message() << "\n";
            return;
        }

        // 요청 처리
        m_response = ProcessRequest(m_request);

        // 비동기적으로 응답 전송
        auto self = shared_ptr<Service>(this);  // 응답도 동일하게 shared_ptr로 감싸서 전달
        asio::async_write(*m_sock, asio::buffer(m_response),
            [self](const system::error_code& ec, std::size_t bytes_transferred) {
                self->onResponseSent(ec, bytes_transferred);
            });
    }

    // 응답을 전송한 후의 처리
    void onResponseSent(const system::error_code& ec, std::size_t bytes_transferred) {
        if (ec) {
            std::cerr << "Error occurred! Error code = "
                << ec.value() << ". Message: " << ec.message() << "\n";
        }

        // 자동으로 shared_ptr가 cleanup을 처리함
    }

    // 요청 처리 (여기서는 단순한 작업을 시뮬레이션)
    std::string ProcessRequest(asio::streambuf& request) {
        int i = 0;
        while (i != 1000000)
            ++i;

        std::this_thread::sleep_for(std::chrono::milliseconds(100));

        return "Response\n";  // 간단한 응답 반환
    }

private:
    std::shared_ptr<asio::ip::tcp::socket> m_sock;  // 클라이언트 소켓
    asio::streambuf m_request;  // 요청을 담을 버퍼
    std::string m_response;  // 응답 문자열
};

class Acceptor {
public:
    Acceptor(asio::io_context& ios, unsigned short port_num)
        : m_ios(ios),
        m_acceptor(m_ios, asio::ip::tcp::endpoint(asio::ip::address_v4::any(), port_num)) {}

    void Start() {
        doAccept();
    }

    void Stop() {
        m_acceptor.close();
    }

private:
    void doAccept() {
        auto sock = std::make_shared<asio::ip::tcp::socket>(m_ios);

        m_acceptor.async_accept(*sock,
            [this, sock](const system::error_code& ec) {
                if (!ec) {
                    std::make_shared<Service>(sock)->StartHandling();
                }
                else {
                    std::cerr << "Error occurred! Error code = "
                        << ec.value() << ". Message: " << ec.message() << "\n";
                }

                if (m_acceptor.is_open()) {
                    doAccept();
                }
            });
    }

private:
    asio::io_context& m_ios;
    asio::ip::tcp::acceptor m_acceptor;
};

class Server {
public:
    void Start(unsigned short port_num, unsigned int thread_pool_size) {
        assert(thread_pool_size > 0);

        acc = std::make_unique<Acceptor>(m_ios, port_num);
        acc->Start();

        for (unsigned int i = 0; i < thread_pool_size; ++i) {
            thread_pool.emplace_back([this]() { m_ios.run(); });
        }
    }

    void Stop() {
        acc->Stop();
        m_ios.stop();

        for (auto& thread : thread_pool) {
            if (thread.joinable()) {
                thread.join();
            }
        }
    }

private:
    asio::io_context m_ios;
    std::unique_ptr<Acceptor> acc;
    std::vector<std::thread> thread_pool;
};

const unsigned int DEFAULT_THREAD_POOL_SIZE = 2;

int main() {
    const unsigned short port_num = 3333;

    try {
        Server srv;

        unsigned int thread_pool_size = std::thread::hardware_concurrency() * 2;
        if (thread_pool_size == 0) {
            thread_pool_size = DEFAULT_THREAD_POOL_SIZE;
        }

        srv.Start(port_num, thread_pool_size);

        std::this_thread::sleep_for(std::chrono::seconds(60));

        srv.Stop();
    }
    catch (const std::exception& e) {
        std::cerr << "Error occurred! Message: " << e.what() << "\n";
    }

    return 0;
}

 

 

Service 클래스

소켓을 생성자의 인자로 받아 쓰기와 읽기 연산이 진행되는 클래스

  1. 클래스의 인스턴스를 생성할 때 인자로 소켓의 주소를 받는다.
  2. 해당 주소를 통해 클래스 내부 멤버인 m_sock에 전달하여 초기화를 해준다.
  3. public형식의 멤버 함수 StartHandling 메서드가 호출되면 클래스 내부 함수에서 본격적인 연산 작업이 수행된다.
  4. async_read_until 메서드를 통해 소켓을 구분자('\n')까지 읽고 onRequestReceived 함수를 호출한다.
  5. 콜백 함수로 onRequestReceived 함수에선 읽은 요청을 ProcessRequest 함수의 인자로 전달한다.
  6. ProcessRequest 함수에선 클라이언트의 요청에 해당하는 작업 후 클라이언트에 반환할 응답을 리턴한다.
  7. 다시 onRequestReceived 함수에서 해당 응답을 버퍼로 비동기 쓰기 작업을 시작한다.
  8. 콜백 함수로 onResponseSent 함수를 호출한다. 여기가 로직의 끝이므로  shared_ptr가 cleanup을 진행한다.

 

Acceptor 클래스

io_context와 포트 번호를 인자로 받아 리스닝 모드로 진입하기 위한 클래스

  1. 생성자의 인자로 받은 ios를 활용하여 m_ios를 초기화 해준다.
  2. IPv4 any주소와 포트번호를 결합해 종료점을 만들고, m_ios와 결합해 m_acceptor를 초기화 해준다.
  3. Start메서드가 실행되면 doAccept 멤버 함수를 호출해 준다.
  4. doAccept 함수가 호출되면 m_ios를 활용해 소켓 sock를 초기화 해준다.
  5. m_acceptor에 비동기 연결 async_accept 메서드를 사용해 소켓과 콜백 함수를 인자로 전달한다.
  6. 오류가 발생하지 않았다면 sock을 Service 클래스 타입의 unique_ptr 의 인자로 전달한다.
  7. Service클래스의 멤버 함수인 StartHandling을 호출하여 연산 작업을 시작한다.
  8. 연산을 완료하여 새로운 클라이언트를 맞을 준비가 되었다면 재귀적으로 doAccept 모드로 진입한다.

 

Server 클래스

서버의 구동과 종료를 진행하는 클래스

  1. Start 메서드에 포트 번호와 쓰레드 풀의 사이즈를 인자로 입력 받아준다.
  2. assert를 통해 사용할 수 있는 스레드가 있다면 로직을 계속 진행하고 아니면 오류를 반환한다.
  3. m_ios와 포트 번호를 인자로 전달해 Acceptor클래스 타입의 unique_ptr acc를 생성한다.
  4. acc를 통해 Acceptor클래스의 멤버 함수 Start를 호출하여 리스닝 상태로 변환해준다.
  5. 쓰레드 풀의 크기만큼 반복문을 실행하여 thread타입의 벡터 thread_pool내 모든 스레드를 run상태로 만들어 준다.
  6. Stop 메서드가 호출되면 acc를 통해 Acceptor클래스의 멤버 함수 Stop을 호출하여 종료해준다.
  7. 또 m_ios를 run상태에서 stop상태로 변환하여 run상태를 해제해 준다.
  8. 쓰레드 풀을 순회하며 현재 스레드가 종료할 수 있는 상태라면 종료를 해준다.

 

main()

  1. 서버가 사용할 포트 번호를 지정해 준다.
  2. Server 클래스의 인스턴스 srv를 초기화 해준다.
  3. hardware_concurrency 메서드를 통해 CPU 코어의 개수의 두배 만큼 thread_pool_size를 초기화 해준다.
  4. 만약 thread_pool_size의 값이 0이라면 DEFAULT_THREAD_POOL_SIZE를 통해 기본값으로 세팅해준다.
  5. srv를 통해 Server 클래스의 Start 멤버 함수에 포트번호와 스레드 풀의 크기를 인자로 전달해 준다.
  6. 메인 스레드에서 60초를 대기한다.
  7. srv를 통해 Server 클래스의 Stop 멤버 함수를 호출한다.

 

728x90
반응형