반응형
개요
비동기 TCP 서버는 다음과 같은 사항을 만족시키는 분산 어플리케이션이다.
- 클라이언트 서버 통신 모델에서 서버로 동작한다.
- TCP 프로토콜을 사용해 클라이언트 프로그램과 통신한다.
- 비동기 I/O 및 제어 연산을 한다.
- 한 번에 여러 클라이언트를 처리할 수 있다.
일반적인 비동기 TCP 서버는 다음과 같은 알고리즘에 따라 동작한다.
- 수용자 소켓을 할당하고 특정 TCP 포트에 묶는다.
- 비동기 수용 연산을 시작하도록 한다.
- 하나 이상의 스레드를 만들어 Boost.asio 이벤트 루프를 실행시키는 스레드 풀에 추가한다.
- 비동기 수용 연산이 끝나면, 다음 연결 요청을 받아들이는 새로운 연산을 시작하도록 한다.
- 비동기 읽기 연산을 시작시켜 연결된 클라이언트로부터 들어오는 요청을 읽는다.
- 비동기 읽기 연산이 끝나면, 요청을 처리하고 응답 메시지를 준비한다.
- 응답 메시지를 클라이언트로 보낼 비동기 쓰기 연산을 시작하도록 한다.
- 비동기 쓰기 연산이 끝나면, 연결을 닫고 소켓을 할당 해지한다.
시작부터 4번 까지는 실제 비동기 연산이 실행되는 시간에 따라 순서가 뒤바뀔 수 있다.
서버의 컴퓨터에 프로세서가 하나뿐이라 해도 서버가 비동기 모델을 사용하기 때문에 실행되는 순서는 위와 같지 않을 수 있다.
비동기 TCP 서버 구현
#include <boost/asio.hpp>
#include <thread>
#include <memory>
#include <iostream>
#include <vector>
using namespace boost;
class Service {
public:
Service(std::shared_ptr<asio::ip::tcp::socket> sock)
: m_sock(sock) {}
// 요청을 비동기적으로 읽어들이고 처리 시작
void StartHandling() {
// 'shared_ptr'을 비동기 콜백에 전달
auto self = shared_ptr<Service>(this); // 'this'를 shared_ptr로 감싸서 전달
asio::async_read_until(*m_sock, m_request, '\n',
[self](const system::error_code& ec, std::size_t bytes_transferred) {
self->onRequestReceived(ec, bytes_transferred);
});
}
private:
// 요청을 받은 후 처리하는 함수
void onRequestReceived(const system::error_code& ec, std::size_t bytes_transferred) {
if (ec) {
std::cerr << "Error occurred! Error code = "
<< ec.value() << ". Message: " << ec.message() << "\n";
return;
}
// 요청 처리
m_response = ProcessRequest(m_request);
// 비동기적으로 응답 전송
auto self = shared_ptr<Service>(this); // 응답도 동일하게 shared_ptr로 감싸서 전달
asio::async_write(*m_sock, asio::buffer(m_response),
[self](const system::error_code& ec, std::size_t bytes_transferred) {
self->onResponseSent(ec, bytes_transferred);
});
}
// 응답을 전송한 후의 처리
void onResponseSent(const system::error_code& ec, std::size_t bytes_transferred) {
if (ec) {
std::cerr << "Error occurred! Error code = "
<< ec.value() << ". Message: " << ec.message() << "\n";
}
// 자동으로 shared_ptr가 cleanup을 처리함
}
// 요청 처리 (여기서는 단순한 작업을 시뮬레이션)
std::string ProcessRequest(asio::streambuf& request) {
int i = 0;
while (i != 1000000)
++i;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
return "Response\n"; // 간단한 응답 반환
}
private:
std::shared_ptr<asio::ip::tcp::socket> m_sock; // 클라이언트 소켓
asio::streambuf m_request; // 요청을 담을 버퍼
std::string m_response; // 응답 문자열
};
class Acceptor {
public:
Acceptor(asio::io_context& ios, unsigned short port_num)
: m_ios(ios),
m_acceptor(m_ios, asio::ip::tcp::endpoint(asio::ip::address_v4::any(), port_num)) {}
void Start() {
doAccept();
}
void Stop() {
m_acceptor.close();
}
private:
void doAccept() {
auto sock = std::make_shared<asio::ip::tcp::socket>(m_ios);
m_acceptor.async_accept(*sock,
[this, sock](const system::error_code& ec) {
if (!ec) {
std::make_shared<Service>(sock)->StartHandling();
}
else {
std::cerr << "Error occurred! Error code = "
<< ec.value() << ". Message: " << ec.message() << "\n";
}
if (m_acceptor.is_open()) {
doAccept();
}
});
}
private:
asio::io_context& m_ios;
asio::ip::tcp::acceptor m_acceptor;
};
class Server {
public:
void Start(unsigned short port_num, unsigned int thread_pool_size) {
assert(thread_pool_size > 0);
acc = std::make_unique<Acceptor>(m_ios, port_num);
acc->Start();
for (unsigned int i = 0; i < thread_pool_size; ++i) {
thread_pool.emplace_back([this]() { m_ios.run(); });
}
}
void Stop() {
acc->Stop();
m_ios.stop();
for (auto& thread : thread_pool) {
if (thread.joinable()) {
thread.join();
}
}
}
private:
asio::io_context m_ios;
std::unique_ptr<Acceptor> acc;
std::vector<std::thread> thread_pool;
};
const unsigned int DEFAULT_THREAD_POOL_SIZE = 2;
int main() {
const unsigned short port_num = 3333;
try {
Server srv;
unsigned int thread_pool_size = std::thread::hardware_concurrency() * 2;
if (thread_pool_size == 0) {
thread_pool_size = DEFAULT_THREAD_POOL_SIZE;
}
srv.Start(port_num, thread_pool_size);
std::this_thread::sleep_for(std::chrono::seconds(60));
srv.Stop();
}
catch (const std::exception& e) {
std::cerr << "Error occurred! Message: " << e.what() << "\n";
}
return 0;
}
Service 클래스
소켓을 생성자의 인자로 받아 쓰기와 읽기 연산이 진행되는 클래스
- 클래스의 인스턴스를 생성할 때 인자로 소켓의 주소를 받는다.
- 해당 주소를 통해 클래스 내부 멤버인 m_sock에 전달하여 초기화를 해준다.
- public형식의 멤버 함수 StartHandling 메서드가 호출되면 클래스 내부 함수에서 본격적인 연산 작업이 수행된다.
- async_read_until 메서드를 통해 소켓을 구분자('\n')까지 읽고 onRequestReceived 함수를 호출한다.
- 콜백 함수로 onRequestReceived 함수에선 읽은 요청을 ProcessRequest 함수의 인자로 전달한다.
- ProcessRequest 함수에선 클라이언트의 요청에 해당하는 작업 후 클라이언트에 반환할 응답을 리턴한다.
- 다시 onRequestReceived 함수에서 해당 응답을 버퍼로 비동기 쓰기 작업을 시작한다.
- 콜백 함수로 onResponseSent 함수를 호출한다. 여기가 로직의 끝이므로 shared_ptr가 cleanup을 진행한다.
Acceptor 클래스
io_context와 포트 번호를 인자로 받아 리스닝 모드로 진입하기 위한 클래스
- 생성자의 인자로 받은 ios를 활용하여 m_ios를 초기화 해준다.
- IPv4 any주소와 포트번호를 결합해 종료점을 만들고, m_ios와 결합해 m_acceptor를 초기화 해준다.
- Start메서드가 실행되면 doAccept 멤버 함수를 호출해 준다.
- doAccept 함수가 호출되면 m_ios를 활용해 소켓 sock를 초기화 해준다.
- m_acceptor에 비동기 연결 async_accept 메서드를 사용해 소켓과 콜백 함수를 인자로 전달한다.
- 오류가 발생하지 않았다면 sock을 Service 클래스 타입의 unique_ptr 의 인자로 전달한다.
- Service클래스의 멤버 함수인 StartHandling을 호출하여 연산 작업을 시작한다.
- 연산을 완료하여 새로운 클라이언트를 맞을 준비가 되었다면 재귀적으로 doAccept 모드로 진입한다.
Server 클래스
서버의 구동과 종료를 진행하는 클래스
- Start 메서드에 포트 번호와 쓰레드 풀의 사이즈를 인자로 입력 받아준다.
- assert를 통해 사용할 수 있는 스레드가 있다면 로직을 계속 진행하고 아니면 오류를 반환한다.
- m_ios와 포트 번호를 인자로 전달해 Acceptor클래스 타입의 unique_ptr acc를 생성한다.
- acc를 통해 Acceptor클래스의 멤버 함수 Start를 호출하여 리스닝 상태로 변환해준다.
- 쓰레드 풀의 크기만큼 반복문을 실행하여 thread타입의 벡터 thread_pool내 모든 스레드를 run상태로 만들어 준다.
- Stop 메서드가 호출되면 acc를 통해 Acceptor클래스의 멤버 함수 Stop을 호출하여 종료해준다.
- 또 m_ios를 run상태에서 stop상태로 변환하여 run상태를 해제해 준다.
- 쓰레드 풀을 순회하며 현재 스레드가 종료할 수 있는 상태라면 종료를 해준다.
main()
- 서버가 사용할 포트 번호를 지정해 준다.
- Server 클래스의 인스턴스 srv를 초기화 해준다.
- hardware_concurrency 메서드를 통해 CPU 코어의 개수의 두배 만큼 thread_pool_size를 초기화 해준다.
- 만약 thread_pool_size의 값이 0이라면 DEFAULT_THREAD_POOL_SIZE를 통해 기본값으로 세팅해준다.
- srv를 통해 Server 클래스의 Start 멤버 함수에 포트번호와 스레드 풀의 크기를 인자로 전달해 준다.
- 메인 스레드에서 60초를 대기한다.
- srv를 통해 Server 클래스의 Stop 멤버 함수를 호출한다.
728x90
반응형
'네트워크 통신 > Boost' 카테고리의 다른 글
Boost.asio 실시간 채팅 AWS EC2 서버 구현 (0) | 2024.11.24 |
---|---|
Boost.asio 병렬 동기 TCP 서버 (0) | 2024.11.20 |
Boost.asio 반복 동기 TCP 서버 (0) | 2024.11.18 |
Boost.asio 서버 개요 (1) | 2024.11.18 |
Boost.asio 비동기 멀티 스레드 TCP 클라이언트 (0) | 2024.11.18 |