【发布时间】:2021-01-30 15:11:24
【问题描述】:
我的动机。
我正在尝试构建一个简单的信使。目前我已经编写了支持“类似邮件功能”的客户端和服务器应用程序,即它们缺少您在每个即时消息中都有的聊天交互。
这是我使用的模型。
服务器:每个连接的客户端的服务器都有一个专用的Service 类来提供实际服务。 Service 类的实例有一个 id。
客户端:在特定时刻同时开始从关联的Service 实例读取消息和向其写入消息。
Tracker:通过将用户的登录信息和Service ids 保存在地图中来记录用户的当前会话。还通过保存键值对(聊天参与者 id 1,聊天参与者 id 2)记录打开的聊天。因为我有一个数据库,所以我可以互换使用用户的登录名和 ID。
这是一个典型的使用场景。
- 用户正在尝试登录。服务器将 ID 为 1 的
Service实例专用于该用户。然后将用户标识为 Bob。 - Bob 开始与 Ann 聊天。
Tracker记录 Bob 使用的Service1 以及 Bob 打开了与 Ann 的聊天。 - 用户正在尝试登录。服务器将 ID 为 2 的
Service实例专用于该用户。然后将用户标识为 Ann。 - Ann 开始与 Bob 聊天。
Tracker记录了 Ann 使用的Service2 以及 Ann 打开了与 Bob 的聊天。 - Ann 给 Bob 写了一条消息。
Service2 收到消息并要求Service1 如果 Bob 已打开与 Ann 的聊天,则将消息发送到 Bob 的聊天。为此,我使用Tracker。在我们的例子中,Bob 正在聊天,所以 Bob 的客户端应用程序应该从Service1 读取消息。否则Service2 只会将新消息存储在数据库中。
当用户打开与某人的聊天时,客户端应用程序同时开始向关联 Service 实例读取和写入消息。
问题
- Bob 开始与 Ann 聊天。 Ann 开始与 Bob 聊天。
- Ann 发送消息。它们显示在 Bobs 聊天中。
- Bob 发送消息。它不会显示在 Ann 的聊天中。此外,Ann 的其他消息不再显示在 Bob 的聊天中。
这是我的服务器代码的一部分。我添加了一些上下文,但您可能想查看Service::onMessageReceived、Service::receive_message、Service::send_to_chat
/// Struct to track active sessions of clients
struct Tracker {
static std::mutex current_sessions_guard; ///< mutex to lock the map of current sessions between threads
static std::map<long, long> current_sessions;
static std::map<long, int> client_to_service_id;
};
在客户服务模型中提供实际服务的类
class Service {
public:
void send_to_chat(const std::string& new_message) {
asio::async_write(*m_sock.get(), asio::buffer(new_message),
[this]() {
onAnotherPartyMessageSent();
});
}
private:
void onReceivedReady();
void receive_message() {
/// Server loop for reading messages from the client
spdlog::info("[{}] in receive_message", service_id);
asio::async_read_until(*m_sock.get(), *(m_message.get()), '\n',
[this]() {
onMessageReceived();
});
}
void onMessageReceived();
private:
std::shared_ptr<asio::ip::tcp::socket> m_sock; ///< Pointer to an active socket that is used to communicate
///< with the client
int service_id;
long dialog_id = -1, client_id = -1, another_party_id = -1;
std::shared_ptr<asio::streambuf> m_message;
};
方法的定义
void Service::onMessageReceived() {
/// Updates the database with the new message and asks Service instance of another participant
/// to send the message if they opened this chat.
std::istream istrm(m_message.get());
std::string new_message;
std::getline(istrm, new_message);
m_message.reset(new asio::streambuf);
std::unique_lock<std::mutex> tracker_lock(Tracker::current_sessions_guard);
if (Tracker::current_sessions.find(another_party_id) != Tracker::current_sessions.end()) {
if (Tracker::current_sessions[another_party_id] == client_id) {
int another_party_service_id = Tracker::client_to_service_id[another_party_id];
std::string formatted_msg = _form_message_str(login, new_message);
spdlog::info("[{}] sends to chat '{}'", another_party_service_id, new_message);
Server::launched_services[another_party_service_id]->send_to_chat(formatted_msg);
}
}
tracker_lock.unlock();
receive_message();
}
这是我的客户端代码的一部分。我添加了一些上下文,但您可能想查看AsyncTCPClient::onSentReady、AsyncTCPClient::message_send_loop、AsyncTCPClient::message_wait_loop。
/// Struct that stores a session with the given server
struct Session {
asio::ip::tcp::socket m_sock; //!< The socket for the client application to connect to the server
asio::ip::tcp::endpoint m_ep; //!< The server's endpoint
std::string current_chat;
std::shared_ptr<asio::streambuf> m_chat_buf;
std::shared_ptr<asio::streambuf> m_received_message;
};
/// Class that implements an asynchronous TCP client to interact with Service class
class AsyncTCPClient: public asio::noncopyable {
void onSentReady(std::shared_ptr<Session> session) {
msg_wait_thread.reset(new std::thread([this, session] {
asio::async_read_until(session->m_sock, *(session->m_received_message.get()), "\n",
[this, session] () {
message_wait_loop(session);
});
}));
msg_wait_thread->detach();
msg_thread.reset(new std::thread([this, session] {
message_send_loop(session);
}));
msg_thread->detach();
}
void message_send_loop(std::shared_ptr<Session> session) {
/// Starts loop in the current chat enabling the client to keep sending messages to another party
logger->info("'{}' in message_send_loop", session->login);
clear_console();
m_console.write(session->current_chat);
m_console.write("Write your message: ");
std::string new_message;
// We use a do/while loop to prevent empty messages either because of the client input
// or \n's that were not read before
do {
new_message = m_console.read();
} while (new_message.empty());
std::unique_lock<std::mutex> lock_std_out(std_out_guard);
session->current_chat.append(_form_message_str(session->login, new_message));
lock_std_out.unlock();
asio::async_write(session->m_sock, asio::buffer(new_message + "\n"),
[this, session] () {
message_send_loop(session);
});
}
void message_wait_loop(std::shared_ptr<Session> session) {
/// Starts loop in the current chat enabling the client to keep reading messages from another party
logger->info("'{}' in message_wait_loop", session->login);
std::istream istrm(session->m_received_message.get());
std::string received_message;
std::getline(istrm, received_message);
session->m_received_message.reset(new asio::streambuf);
std::unique_lock<std::mutex> lock_std_out(std_out_wait_guard);
session->current_chat.append(received_message + "\n");
lock_std_out.unlock();
clear_console();
m_console.write(session->current_chat);
m_console.write("Write your message: ");
asio::async_read_until(session->m_sock, *(session->m_received_message.get()), "\n",
[this, session] (std::size_t) {
message_wait_loop(session);
});
}
private:
asio::io_context m_ios;
};
所以,当我描述问题时,我在第 3 点上没有两个客户端的 "'{}' in message_wait_loop" 日志)。但我在第 2 点为 Bob 的客户提供了这些日志。
我也使用来自answer here 的控制台。它通过互斥体去除回声并控制标准输入/输出资源。但是它并没有解决我的问题。
任何帮助将不胜感激。
【问题讨论】:
标签: c++ client-server chat boost-asio