Renaming, Fixed NetworkMessage serialization and performance improvement with vectors in connection handler

This commit is contained in:
TheDoctor
2019-10-19 19:38:58 +01:00
parent e026da4ccd
commit 3555534628
7 changed files with 38 additions and 18 deletions

View File

@ -61,7 +61,7 @@ namespace std::net
int32_t sizeOfNetHeader = sizeof(NetworkHeader); int32_t sizeOfNetHeader = sizeof(NetworkHeader);
NetworkHeader header; NetworkHeader header;
header.Size = 13 + sizeOfNetHeader + sizeof(T); header.Size = 13 + sizeOfNetHeader + (m_data ? sizeof(T) : 0);
byte *bytes = new byte[header.Size](); byte *bytes = new byte[header.Size]();
memcpy(bytes, &header, sizeOfNetHeader); memcpy(bytes, &header, sizeOfNetHeader);
@ -75,6 +75,7 @@ namespace std::net
memcpy(bytes + sizeOfNetHeader + 5, destination, 4); memcpy(bytes + sizeOfNetHeader + 5, destination, 4);
memcpy(bytes + sizeOfNetHeader + 9, tag, 4); memcpy(bytes + sizeOfNetHeader + 9, tag, 4);
if (m_data)
memcpy(bytes + 13 + sizeOfNetHeader, m_data, sizeof(T)); memcpy(bytes + 13 + sizeOfNetHeader, m_data, sizeof(T));
size = header.Size; size = header.Size;

View File

@ -28,6 +28,12 @@ namespace std::net
sendMessage(msg); sendMessage(msg);
} }
void SendMessage(DistributionMode mode, uint32_t destinationId, uint32_t tag)
{
NetworkMessage msg(m_id, mode, destinationId, tag, nullptr, 0);
sendMessage(msg);
}
void ReceiveData(); void ReceiveData();
function<void(uint32_t, DistributionMode, uint32_t, uint32_t, void*)> DataReceivedEvent; function<void(uint32_t, DistributionMode, uint32_t, uint32_t, void*)> DataReceivedEvent;

View File

@ -35,10 +35,10 @@ namespace std::net
uint32_t GetAvailableID(); uint32_t GetAvailableID();
private: private:
void HandleReceiveMsgAndConns(); void HandleConnections();
void HandleMessage(const NetworkMessage &msg); void HandleMessage(const NetworkMessage &msg);
void HandleReceiveMsgAndConnsThreaded(); void HandleConnectionsThreaded();
private: private:
vector<shared_ptr<TcpConnection>> m_list; vector<shared_ptr<TcpConnection>> m_list;

View File

@ -28,7 +28,7 @@ namespace std::net
int32_t sizeOfNetHeader = sizeof(NetworkHeader); int32_t sizeOfNetHeader = sizeof(NetworkHeader);
NetworkHeader header; NetworkHeader header;
header.Size = 13 + sizeOfNetHeader + m_dataSize; header.Size = 13 + sizeOfNetHeader + (m_data ? m_dataSize : 0);
byte *bytes = new byte[header.Size](); byte *bytes = new byte[header.Size]();
memcpy(bytes, &header, sizeOfNetHeader); memcpy(bytes, &header, sizeOfNetHeader);
@ -42,6 +42,7 @@ namespace std::net
memcpy(bytes + sizeOfNetHeader + 5, destination, 4); memcpy(bytes + sizeOfNetHeader + 5, destination, 4);
memcpy(bytes + sizeOfNetHeader + 9, tag, 4); memcpy(bytes + sizeOfNetHeader + 9, tag, 4);
if (m_data)
memcpy(bytes + 13 + sizeOfNetHeader, m_data, m_dataSize); memcpy(bytes + 13 + sizeOfNetHeader, m_data, m_dataSize);
size = header.Size; size = header.Size;

View File

@ -19,20 +19,22 @@ namespace std::net
TcpConnectionHandler::~TcpConnectionHandler() TcpConnectionHandler::~TcpConnectionHandler()
{ {
m_run.exchange(false); Stop();
} }
void TcpConnectionHandler::Start() void TcpConnectionHandler::Start()
{ {
m_run.exchange(true); m_run.exchange(true);
m_pollFds.reserve(m_maxConnections);
pollfd master_fd; pollfd master_fd;
master_fd.fd = m_listenerPtr->m_socket->GetNativeSocket(); master_fd.fd = m_listenerPtr->m_socket->GetNativeSocket();
master_fd.events = POLLRDNORM; master_fd.events = POLLRDNORM;
m_pollFds.emplace_back(master_fd); m_pollFds.emplace_back(master_fd);
thread receive_thread(&TcpConnectionHandler::HandleReceiveMsgAndConnsThreaded, this); thread handleCons(&TcpConnectionHandler::HandleConnectionsThreaded, this);
m_receiveThread.swap(receive_thread); m_receiveThread.swap(handleCons);
} }
void TcpConnectionHandler::Stop() void TcpConnectionHandler::Stop()
@ -100,6 +102,7 @@ namespace std::net
void TcpConnectionHandler::SetMaxConnections(uint32_t max_connections) void TcpConnectionHandler::SetMaxConnections(uint32_t max_connections)
{ {
m_maxConnections = max_connections; m_maxConnections = max_connections;
m_pollFds.reserve(m_maxConnections);
} }
void TcpConnectionHandler::HandlePluginMessage(const NetworkMessage& message) void TcpConnectionHandler::HandlePluginMessage(const NetworkMessage& message)
@ -107,7 +110,7 @@ namespace std::net
m_pluginManager->HandleMessage(message); m_pluginManager->HandleMessage(message);
} }
void TcpConnectionHandler::HandleReceiveMsgAndConns() void TcpConnectionHandler::HandleConnections()
{ {
int res = poll(m_pollFds.data(), m_pollFds.size(), -1); int res = poll(m_pollFds.data(), m_pollFds.size(), -1);
@ -259,9 +262,9 @@ namespace std::net
HandlePluginMessage(msg); HandlePluginMessage(msg);
} }
void TcpConnectionHandler::HandleReceiveMsgAndConnsThreaded() void TcpConnectionHandler::HandleConnectionsThreaded()
{ {
while (m_run.load()) while (m_run.load())
HandleReceiveMsgAndConns(); HandleConnections();
} }
} }

View File

@ -13,7 +13,7 @@ class Plugin : public std::net::Plugin
{ {
virtual void HandleMessage(const std::net::NetworkMessage& message) override virtual void HandleMessage(const std::net::NetworkMessage& message) override
{ {
std::cout << "asd" << std::endl;
} }
}; };
@ -27,8 +27,15 @@ int main()
std::net::TcpConnection con; std::net::TcpConnection con;
con.Connect(std::net::IPAddress("127.0.0.1")); con.Connect(std::net::IPAddress("127.0.0.1"));
while (true) bool sent = false;
while (con) // 8% of my cpu
{ {
con.ReceiveData(); con.ReceiveData();
if (!sent)
{
sent = true;
con.SendMessage(std::net::DistributionMode::Server, 0, 1);
}
} }
} }

View File

@ -43,9 +43,11 @@ namespace std::net
int32_t sent; int32_t sent;
secure ? secure_socket->Recv((byte*)string.c_str(), string.size(), sent) : socket->Send((byte*)string.c_str(), string.size(), sent); secure ? secure_socket->Recv((byte*)string.c_str(), string.size(), sent) : socket->Send((byte*)string.c_str(), string.size(), sent);
vector<char> buffer(16384); // 16 KiB // redo
stringstream ss;
byte* buffer[16384]; // 16 KiB
stringstream ss;
/*
int32_t read; int32_t read;
do do
{ {
@ -54,7 +56,7 @@ namespace std::net
} }
while (read > 0); while (read > 0);
secure ? secure_socket->Close() : socket->Close(); secure ? secure_socket->Close() : socket->Close();
*/
return Response(ss.str()); return Response(ss.str());
} }