네트워크 통신/Boost

Boost.asio 실시간 채팅 AWS EC2 서버 구현

마달랭 2024. 11. 24. 01:05
반응형

개요

클라이언트가 채팅 서버에 접속하고, 각자 닉네임을 생성한 후에 실시간으로 다른 클라이언트와 채팅을 통해 소통하는 소켓 통신을 구현해 보자

로직은 다음과 같다.

  1. 클라이언트가 서버로 연결 요청을 한다.
  2. 서버가 연결 요청을 수락한다.
  3. 클라이언트가 사용할 닉네임을 설정하고 이를 서버에 쓰기 연산을 한다.
  4. 서버에서 해당 클라이언트의 닉네임을 읽고 기억한다.
  5. 클라이언트가 동기 쓰기 연산을 통해 하고싶은 말을 서버로 출력한다.
  6. 서버가 클라이언트로 부터 받은 데이터를 읽고 현재 연결되어 있는 모든 클라이언트에게 데이터를 보낸 클라이언트의 닉네임을 사용하여 쓰기 작업을 한다.
  7. 모든 클라이언트에서 서버로 부터 받은 데이터를 읽는다.

 

AWS EC2 서버

서버는 Linux 환경에서 구현한다, 물론 클라이언트도 구현 할것이다.

AWS EC2를 통해 가상 Linux OS 인스턴스를 생성하여 서버를 구동한다.

해당 인스턴스의 퍼블릭 IP 주소와 인바운드 규칙에 추가된 포트 번호를 통해 Linux 및 Window환경에서 접속한다.

 

파일 구조는 다음과 같다.

/project
    ├── server.cpp      // 서버 소켓 통신 및 관리
    └── Makefile // 빌드 설정 파일

 

 

Server.cpp

#include <boost/asio.hpp>
#include <thread>
#include <memory>
#include <iostream>
#include <vector>
#include <unordered_map>
#include <mutex>
#include <sstream>

using namespace boost;

class Client {
public:
    Client(std::shared_ptr<asio::ip::tcp::socket> socket, std::string nickname)
        : m_socket(socket),
        m_nickname(std::move(nickname)) {}

    std::shared_ptr<asio::ip::tcp::socket> getSocket() const {
        return m_socket;
    }

    std::string getNickname() const {
        return m_nickname;
    }

private:
    std::shared_ptr<asio::ip::tcp::socket> m_socket;
    std::string m_nickname;
};

class Service : public std::enable_shared_from_this<Service> {
public:
    Service(std::shared_ptr<asio::ip::tcp::socket> sock, std::unordered_map<std::string, std::shared_ptr<Client>>& clients, std::mutex& clients_mutex)
        : m_sock(sock),
        m_clients(clients),
        m_clients_mutex(clients_mutex) {}

    // 요청을 비동기적으로 읽어들이고 처리 시작
    void StartHandling() {
        // 'shared_ptr'을 비동기 콜백에 전달
        auto self = shared_from_this();

        asio::async_read_until(*m_sock, m_request, '\n',
            [self](const system::error_code& ec, std::size_t bytes_transferred) {
                self->onRequestReceived(ec, bytes_transferred);
            });
    }

private:
    // 요청을 받은 후 처리하는 함수
    void onRequestReceived(const system::error_code& ec, std::size_t bytes_transferred) {
        if (ec) {
            std::cerr << "오류 발생! 오류 코드 = "
                << ec.value() << ". 메세지: " << ec.message() << "\n";
            return;
        }

        std::istream request_stream(&m_request);
        std::string message;
        std::getline(request_stream, message);

        if (!m_nickname.empty()) {
            broadcastMessage(m_nickname + ": " + message);
        }
        else {
            m_nickname = message;
            if (!registerClient()) return;
            broadcastMessage(m_nickname + "님이 입장하셨습니다.");
        }

        StartHandling();
    }

    void broadcastMessage(const std::string& message) {
        std::lock_guard<std::mutex> lock(m_clients_mutex);

        for (const auto& pair : m_clients) {
            auto client = pair.second;
            auto sock = client->getSocket();
            asio::async_write(*sock,
                asio::buffer(message + "\n"),
                [](const system::error_code& ec, std::size_t bytes_trasnferred) {
                    if (ec) {
                        std::cerr << "오류 발생! 오류 코드 = "
                            << ec.value() << ". 메세지: " << ec.message() << "\n";
                    }
                });
        }
    }

    bool registerClient() {
        std::lock_guard<std::mutex> lock(m_clients_mutex);

        if (m_clients.find(m_nickname) == m_clients.end()) {
            m_clients[m_nickname] = std::make_shared<Client>(m_sock, m_nickname);
        }
        else {
            asio::async_write(*m_sock, asio::buffer("해당 닉네임이 이미 사용중입니다, 다시 시도해 주세요\n"),
                [](const system::error_code& ec, std::size_t bytes_transferred) {});
            m_sock->close();
            return false;
        }
        return true;
    }

private:
    std::shared_ptr<asio::ip::tcp::socket> m_sock;  // 클라이언트 소켓
    asio::streambuf m_request;  // 요청을 담을 버퍼
    std::string m_nickname;
    std::unordered_map<std::string, std::shared_ptr<Client>>& m_clients;
    std::mutex& m_clients_mutex;
};

class Acceptor {
public:
    Acceptor(asio::io_context& ios, unsigned short port_num, std::unordered_map<std::string, std::shared_ptr<Client>>& clients, std::mutex& clients_mutex)
        : m_ios(ios),
        m_acceptor(m_ios, asio::ip::tcp::endpoint(asio::ip::address_v4::any(), port_num)),
        m_clients(clients),
        m_clients_mutex(clients_mutex) {}

    void Start() {
        doAccept();
    }

    void Stop() {
        m_acceptor.close();
    }

private:
    void doAccept() {
        auto sock = std::make_shared<asio::ip::tcp::socket>(m_ios);

        m_acceptor.async_accept(*sock,
            [this, sock](const system::error_code& ec) {
                if (!ec) {
                    std::make_shared<Service>(sock, m_clients, m_clients_mutex)->StartHandling();
                }
                else {
                    std::cerr << "오류 발생! 오류 코드 = "
                        << ec.value() << ". 메세지: " << ec.message() << "\n";
                }

                if (m_acceptor.is_open()) {
                    doAccept();
                }
            });
    }

private:
    asio::io_context& m_ios;
    asio::ip::tcp::acceptor m_acceptor;
    std::unordered_map<std::string, std::shared_ptr<Client>>& m_clients;
    std::mutex& m_clients_mutex;
};

class Server {
public:
    void Start(unsigned short port_num, unsigned int thread_pool_size) {
        assert(thread_pool_size > 0);

        acc = std::make_unique<Acceptor>(m_ios, port_num, clients, clients_mutex);
        acc->Start();

        for (unsigned int i = 0; i < thread_pool_size; ++i) {
            thread_pool.emplace_back([this]() { m_ios.run(); });
        }
    }

    void Stop() {
        acc->Stop();
        m_ios.stop();

        for (auto& thread : thread_pool) {
            if (thread.joinable()) {
                thread.join();
            }
        }
    }

private:
    asio::io_context m_ios;
    std::unique_ptr<Acceptor> acc;
    std::vector<std::thread> thread_pool;
    std::unordered_map<std::string, std::shared_ptr<Client>> clients;
    std::mutex clients_mutex;
};

const unsigned int DEFAULT_THREAD_POOL_SIZE = 2;

int main() {
    const unsigned short port_num = 12345;

    try {
        Server srv;

        unsigned int thread_pool_size = std::thread::hardware_concurrency() * 2;
        if (thread_pool_size == 0) {
            thread_pool_size = DEFAULT_THREAD_POOL_SIZE;
        }

        srv.Start(port_num, thread_pool_size);
        while (1);
        srv.Stop();
    }
    catch (const std::exception& e) {
        std::cerr << "오류 발생! 내용: " << e.what() << "\n";
    }

    return 0;
}

 

  1. 연결을 받을 포트 번호를 지정해 준다, 이는 AWS에서 커스텀 TCP 인바운드 규칙을 통해 지정하면 된다.
  2. 포트번호와 사용 가능한 CPU 코어의 수 * 2배로 사용할 스레드의 사이즈를 인자로 서버를 구동한다.
  3. 서버 시작 시 연결된 클라이언트들은 해시맵을 사용해 관리해 준다.
  4. io_context, 포트 번호, 해시맵과 스레드관련 뮤텍스를 인자로 Acceptor를 시작해 준다.
  5. 비동기 리스닝 상태로 있다가 클라이언트 요청이 오면 Service클래스의 StartHandling 메서드를 수행한다.
  6. 첫 생성일 경우 방에 입장했다는 내용을 broadcastMessage 함수를 통해 실행한다.
  7. 이후 클라이언트에서 쓰기 요청이 올때 마다 broadcastMessage 함수를 통해 모든 클라이언트에 전달한다.
  8. StartHandling 함수를 재귀적으로 호출하여 클라이언트 요청을 지속적으로 받는다.

 

Client.cpp

#include <boost/asio.hpp>
#include <iostream>
#include <thread>
#include <string>

using namespace boost;

class ChatClient {
public:
    ChatClient(asio::io_context& ios, const std::string& host, unsigned short port)
        : m_socket(ios), m_resolver(ios) {
        asio::ip::tcp::resolver::query query(host, std::to_string(port));
        m_endpoints = m_resolver.resolve(query);
    }

    void connect(const std::string nickname) {
        asio::async_connect(m_socket, m_endpoints,
            [this, nickname](const system::error_code& ec, const asio::ip::tcp::endpoint&) {
                if (!ec) {
                    std::cout << "서버 연결 완료.\n";
                    sendMessage(nickname);
                    readMessage();
                }
                else {
                    std::cerr << "에러 발생 : " << ec.message() << "\n";
                }
            });
    }

    void sendMessage(const std::string& message) {
        auto msg = std::make_shared<std::string>(message + "\n"); // 문자열 관리
        asio::async_write(m_socket, asio::buffer(*msg),
            [this, msg](const system::error_code& ec, std::size_t bytes_transferred) {
                if (ec) {
                    std::cerr << "메시지 전송 실패 : " << ec.message() << "\n";
                }
            });
    }


private:
    void readMessage() {
        asio::async_read_until(m_socket, m_buffer, '\n',
            [this](const system::error_code& ec, std::size_t bytes_transferred) {
                if (!ec) {
                    // 스트림 버퍼에서 데이터를 안전하게 읽기
                    std::string message{ buffers_begin(m_buffer.data()), buffers_begin(m_buffer.data()) + bytes_transferred - 1 };
                    m_buffer.consume(bytes_transferred); // 버퍼 정리
                    std::cout << message << "\n";

                    readMessage();  // 재귀 호출을 통해 계속 메시지를 받기.
                }
                else {
                    std::cerr << "메시지 수신 실패 : " << ec.message() << "\n";
                }
            });
    }

    asio::ip::tcp::socket m_socket;
    asio::ip::tcp::resolver m_resolver;
    asio::ip::tcp::resolver::results_type m_endpoints;
    asio::streambuf m_buffer;
};

int main() {
    const std::string server_address = "3.35.173.11";
    const unsigned short server_port = 12345;

    try {
        asio::io_context ios;

        std::cout << "사용할 닉네임을 입력해 주세요 : ";
        std::string nickname;
        std::getline(std::cin, nickname);

        ChatClient client(ios, server_address, server_port);
        client.connect(nickname);

        std::thread client_thread([&ios]() { ios.run(); });

        std::string message;
        while (true) {
            std::getline(std::cin, message);
            if (message == "/quit") {
                break;
            }
            client.sendMessage(message);
        }

        ios.stop();
        client_thread.join();
    }
    catch (const std::exception& e) {
        std::cerr << "에러 발생 : " << e.what() << "\n";
    }

    return 0;
}

 

  1. 연결할 서버의 IP주소와 포트번호를 지정한다, 이는 AWS EC2 인스턴스의 값을 받아온다.
  2. 사용할 닉네임을 설정하고, client 객체를 생성한 뒤 서버로 연결을 시도한다.
  3. 연결에 성공했을 경우 자신이 사용할 메시지를 서버로 보낸다.
  4. 이후 서버로 메시지를 보낼 때 마다 sendMessage함수를 통해 로직이 진행된다.
  5. 연결에 성공했다면 readMessage함수를 재귀적으로 호출받고 있어 서버로부터의 메시지를 계속 받게된다.

 

Makefile

 

서버 컴파일 및 clean, run 기능을 자동으로 해주기 위해 makefile을 제작한다.

  • make : 같은 디렉토리에 server.cpp가 있다면 컴파일을 진행한다.
  • run : 컴파일 된 파일 ./server를 쉘에서 실행한다. (서버 구동 시작)
  • clean : 컴파일 된 파일을 삭제한다.

 

서버 구동하기

AWS EC 2 인스턴스를 생성해 준다.

인바운드 규칙에서 TCP 연결 할 포트 번호를 열어둔다, 나의 경우에는 12345를 사용했다.

서버 및 클라리언트 로직 모두 포트 번호를 12345로 일치시켜 준다.

인스턴스의 퍼블릭 IPv4 주소를 클라이언트 로직에 최신화를 시켜준다.

리눅스 환경에서 적절히 디렉토리를 생성하고 서버 로직을 작성하고 구동해 준다.

 

윈도우 환경에서 클라이언트 로직을 실행해 준다.

사용할 닉네임을 입력해 준다, 서버 연결 시 해당 닉네임을 통해 식별이 가능하다.

 

클라이언트를 하나 더 생성해서 서로 말하는 채팅이 보이는지 확인해 준다.

 

faker와 sang은 잘 소통하고 있다. 클라이언트를 하나 더 연결해 보자

 

마찬가지로 채팅방에 입장한 이후로 클라이언트가 서버로 전송한 요청을 다른 클라이언트가 볼 수 있다.

/quit를 통해 채팅방을 나가도 다른 클라이언트들이 소통하는 데엔 문제가 없다.

 

 

확장 가능성

현재 로직상으로는 잘 작동하는 것처럼 보인다.

하지만 이 로직에는 몇가지 기능이 추가되어야 한다.

첫 번째로 클라이언트가 서버와 연결되었을 때 접속 관련한 공지의 한글이 깨져서 나온다.

이는 서버는 리눅스 환경, 클라이언트는 윈도우 환경에서 실행되므로 서로 다른 문자 인코딩을 사용하고 있기 때문이다.

따라서 서버와 클라이언트의 문자 인코딩을 해결해 주어야 한다.

둘 다 리눅스 환경이면 한글이 잘 보인다.

 

두 번째로 클라이언트가 요청을 종료했을 때의 로직이 존재하지 않는다.

로직 상 클라이언트가 요청을 종료했을 때 다른 클라이언트들에게 누군가 나갔다고 공지를 하면 좋지 않을까?

추가로, 이미 나간 클라이언트가 사용하던 닉네임은 사용할 수 없다.

클라이언트가 나갈 경우 해시맵 상에서 clear 처리를 해주어야 한다.

 

세 번째로 서버 구동을 종료하는 로직이 존재하지 않는다.

클라이언트의 경우 /quit를 입력하면 서버와 연결을 종료할 수 있다.

하지만 서버에서는 해당 기능이 없기 때문에 인터럽트를 발생시켜 강제 종료해야 된다.

서버 상에서도 /quit와 같이 서버를 닫는 로직을 구현해 보면 좋을 것이다.

 

마지막으로 AWS EC2 환경에서 인스턴스를 사용하고 있을 경우 인스턴스를 중지하고 시작할 때 마다 IP 주소가 변경된다.

따라서 인스턴스가 켜지고 꺼질 때마다 클라이언트에서는 연결하려는 IP주소를 변경해 주어야 한다.

해당 부분의 불편함도 해결 가능한 방법이 있을지 고려해 보면 좋을 것 같다.

 

728x90
반응형