NetworkMessage->GetData fixed
Plugin -> Splitted message types
PluginManager is now thread safe
TcpConnection added list of events
Properly handling disconnection now
Fixed bug in Socket with error translation
It crashes when it exits, i think it has something to do with shared ptrs or something
This commit is contained in:
TheDoctor
2019-10-21 00:27:21 +01:00
parent 3555534628
commit 5f13265c5d
15 changed files with 223 additions and 107 deletions

View File

@ -89,7 +89,9 @@ namespace std::net
template<typename T> template<typename T>
T *GetData() const T *GetData() const
{ {
return (T*)m_data; if (m_data)
return (T*)m_data;
else return nullptr;
} }
}; };
} }

View File

@ -1,6 +1,8 @@
#pragma once #pragma once
#include <VoidNet_HL/NetworkMessage.hpp> #include <VoidNet_HL/NetworkMessage.hpp>
#include <VoidNet_HL/InternalTags.hpp>
#include <string>
namespace std::net namespace std::net
{ {
@ -11,6 +13,29 @@ namespace std::net
{ {
} }
virtual void HandleMessage(const NetworkMessage& msg) = 0; void HandleMessage(const NetworkMessage& msg)
{
if (msg.GetTag() == (uint32_t)InternalTags::Disconnect)
{
OnDisconnect(*(msg.GetData<string>()));
}
else if (msg.GetTag() == (uint32_t)InternalTags::Connect)
{
OnNewConnection(msg.GetSenderID(), msg.GetData<void>());
}
else if (msg.GetTag() == (uint32_t)InternalTags::AssignID)
{
OnConnection();
}
else
{
OnDataReceived(msg.GetSenderID(), msg.GetDistributionMode(), msg.GetDestinationID(), msg.GetTag(), msg.GetData<void>());
}
}
virtual void OnDisconnect(string) abstract;
virtual void OnNewConnection(uint32_t, void*) abstract;
virtual void OnConnection() abstract;
virtual void OnDataReceived(uint32_t, DistributionMode, uint32_t, uint32_t, void*) abstract;
}; };
} }

View File

@ -1,6 +1,7 @@
#pragma once #pragma once
#include <vector> #include <vector>
#include <mutex>
#include "VoidNet_HL/Plugin/Plugin.hpp" #include "VoidNet_HL/Plugin/Plugin.hpp"
@ -14,16 +15,24 @@ namespace std::net
void HandleMessage(const NetworkMessage& msg) void HandleMessage(const NetworkMessage& msg)
{ {
m_pluginsMutex.lock();
for (size_t i = 0; i < m_plugins.size(); i++) for (size_t i = 0; i < m_plugins.size(); i++)
m_plugins.at(i)->HandleMessage(msg); m_plugins.at(i)->HandleMessage(msg);
m_pluginsMutex.unlock();
} }
void AddPlugin(Plugin* p) void AddPlugin(Plugin* p)
{ {
m_pluginsMutex.lock();
m_plugins.emplace_back(p); m_plugins.emplace_back(p);
m_pluginsMutex.unlock();
} }
private: private:
vector<Plugin*> m_plugins; vector<Plugin*> m_plugins;
mutex m_pluginsMutex;
}; };
} }

View File

@ -21,30 +21,55 @@ namespace std::net
bool Connect(IPAddress addr); bool Connect(IPAddress addr);
bool Disconnect();
template<typename T> template<typename T>
void SendMessage(DistributionMode mode, uint32_t destinationId, uint32_t tag, void *data) bool SendMessage(DistributionMode mode, uint32_t destinationId, uint32_t tag, void *data)
{ {
NetworkMessage msg(m_id, mode, destinationId, tag, data, sizeof(T)); NetworkMessage msg(m_id, mode, destinationId, tag, data, sizeof(T));
sendMessage(msg); return sendMessage(msg);
} }
void SendMessage(DistributionMode mode, uint32_t destinationId, uint32_t tag) bool SendMessage(DistributionMode mode, uint32_t destinationId, uint32_t tag)
{ {
NetworkMessage msg(m_id, mode, destinationId, tag, nullptr, 0); NetworkMessage msg(m_id, mode, destinationId, tag, nullptr, 0);
sendMessage(msg); return sendMessage(msg);
} }
void ReceiveData(); void ReceiveData();
function<void(uint32_t, DistributionMode, uint32_t, uint32_t, void*)> DataReceivedEvent; bool IsConnected;
function<void(string)> DisconnectedEvent;
function<void(uint32_t, void*)> NewConnectionEvent;
function<void()> OnConnectionEvent;
private: private:
bool sendMessage(const NetworkMessage &msg); bool sendMessage(const NetworkMessage &msg);
shared_ptr<TcpClient> m_client; shared_ptr<TcpClient> m_client;
uint32_t m_id; uint32_t m_id;
vector<function<void(uint32_t, DistributionMode, uint32_t, uint32_t, void*)>> m_onDataReceived;
vector<function<void(string)>> m_onDisconnect;
vector<function<void(uint32_t, void*)>> m_onNewConnection;
vector<function<void()>> m_onConnection;
public:
void operator+=(const function<void(uint32_t, DistributionMode, uint32_t, uint32_t, void*)> &rhs)
{
m_onDataReceived.push_back(rhs);
}
void operator+=(const function<void(string)>& rhs)
{
m_onDisconnect.push_back(rhs);
}
void operator+=(const function<void(uint32_t, void*)> & rhs)
{
m_onNewConnection.push_back(rhs);
}
void operator+=(const function<void()>& rhs)
{
m_onConnection.push_back(rhs);
}
}; };
} }

View File

@ -47,7 +47,6 @@ namespace std::net
uint32_t m_maxConnections = 0; uint32_t m_maxConnections = 0;
thread m_receiveThread; thread m_receiveThread;
thread m_sendThread;
atomic_bool m_run; atomic_bool m_run;

View File

@ -3,6 +3,7 @@
#pragma once #pragma once
#include <map> #include <map>
#include <string>
namespace std::net namespace std::net
{ {

View File

@ -81,8 +81,8 @@ namespace std::net
enum class SocketType enum class SocketType
{ {
Unknown = -1, Unknown = -1,
Datagram = 2, Datagram = 2, //SOCK_DGRAM
Streaming = 1, Streaming = 1, //SOCK_STREAM
}; };
enum class SocketProtocol enum class SocketProtocol

View File

@ -3,6 +3,7 @@
#pragma once #pragma once
#include <map> #include <map>
#include <string>
namespace std::net namespace std::net
{ {

View File

@ -21,7 +21,7 @@ namespace std::net
init(); init();
} }
virtual ~Socket() { Close(); } virtual ~Socket() { }
public: public:

View File

@ -10,5 +10,9 @@ namespace std::net
PluginManager::~PluginManager() PluginManager::~PluginManager()
{ {
for (size_t i = 0; i < m_plugins.size(); i++)
{
delete m_plugins[i];
}
} }
} }

View File

@ -6,33 +6,9 @@
namespace std::net namespace std::net
{ {
void received(uint32_t, DistributionMode, uint32_t, uint32_t, void*)
{
std::cout << "received" << std::endl;
}
void disconnected(std::string s)
{
std::cout << s << std::endl;
}
void new_connection(uint32_t, void*)
{
std::cout << "new client connection" << std::endl;
}
void on_connect()
{
std::cout << "i connected" << std::endl;
}
TcpConnection::TcpConnection() : TcpConnection::TcpConnection() :
m_client(new TcpClient()) m_client(new TcpClient())
{ {
DataReceivedEvent = received;
DisconnectedEvent = disconnected;
NewConnectionEvent = new_connection;
OnConnectionEvent = on_connect;
} }
TcpConnection::TcpConnection(TcpClient * client) TcpConnection::TcpConnection(TcpClient * client)
@ -52,7 +28,12 @@ namespace std::net
bool TcpConnection::Connect(IPAddress addr) bool TcpConnection::Connect(IPAddress addr)
{ {
return m_client->Connect(addr); return IsConnected = m_client->Connect(addr);
}
bool TcpConnection::Disconnect()
{
return SendMessage(DistributionMode::AllAndServer, 0, (uint32_t)InternalTags::Disconnect);
} }
bool TcpConnection::sendMessage(const NetworkMessage & msg) bool TcpConnection::sendMessage(const NetworkMessage & msg)
@ -79,25 +60,39 @@ namespace std::net
if (message.GetTag() == (uint32_t)InternalTags::Disconnect) if (message.GetTag() == (uint32_t)InternalTags::Disconnect)
{ {
if (DisconnectedEvent) for (size_t i = 0; i < m_onDisconnect.size(); i++)
DisconnectedEvent(*(message.GetData<string>())); {
//string* msgStr = message.GetData<string>();
m_onDisconnect[i]("");
}
IsConnected = false;
m_client->Close();
} }
else if (message.GetTag() == (uint32_t)InternalTags::Connect) else if (message.GetTag() == (uint32_t)InternalTags::Connect)
{ {
if (NewConnectionEvent) for (size_t i = 0; i < m_onNewConnection.size(); i++)
NewConnectionEvent(message.GetSenderID(), message.GetData<void>()); {
m_onNewConnection[i](message.GetSenderID(), message.GetData<void>());
}
} }
else if (message.GetTag() == (uint32_t)InternalTags::AssignID) else if (message.GetTag() == (uint32_t)InternalTags::AssignID)
{ {
m_id = *(message.GetData<uint32_t>()); m_id = *(message.GetData<uint32_t>());
if (OnConnectionEvent) for (size_t i = 0; i < m_onConnection.size(); i++)
OnConnectionEvent(); {
m_onConnection[i]();
}
IsConnected = true;
} }
else else
{ {
if (DataReceivedEvent) for (size_t i = 0; i < m_onDataReceived.size(); i++)
DataReceivedEvent(message.GetSenderID(), message.GetDistributionMode(), message.GetDestinationID(), message.GetTag(), message.GetData<void>()); {
m_onDataReceived[i](message.GetSenderID(), message.GetDistributionMode(), message.GetDestinationID(), message.GetTag(), message.GetData<void>());
}
} }
} }
} }

View File

@ -7,6 +7,7 @@
#include "VoidNet_LL/TcpListener.hpp" #include "VoidNet_LL/TcpListener.hpp"
#include <chrono> #include <chrono>
#include <iostream>
namespace std::net namespace std::net
{ {
@ -112,62 +113,76 @@ namespace std::net
void TcpConnectionHandler::HandleConnections() void TcpConnectionHandler::HandleConnections()
{ {
int res = poll(m_pollFds.data(), m_pollFds.size(), -1); try
if (res < 0)
{ {
//poll error int res = poll(m_pollFds.data(), m_pollFds.size(), 5);
}
//should never timeout because its infinite (negative) if (res < 0)
//if (res == 0)
//{
//timeout
//}
for (int i = 0; i < m_pollFds.size(); i++)
{
if (m_pollFds.at(i).revents == 0 || m_pollFds[i].revents != POLLRDNORM)
continue;
if (m_pollFds.at(i).fd == m_listenerPtr->m_socket->GetNativeSocket())
{ {
TcpClient *c = m_listenerPtr->AcceptClient(); //poll error
if (c)
{
shared_ptr<TcpConnection> connection = make_shared<TcpConnection>(c);
AddClient(connection);
break;
}
} }
else // not the listening socket
//should never timeout because its infinite (negative)
//if (res == 0)
//{
//timeout
//}
for (int i = 0; i < m_pollFds.size(); i++)
{ {
SOCKET c = m_pollFds.at(i).fd; if (m_pollFds.at(i).revents == 0 || m_pollFds[i].revents != POLLRDNORM)
byte* header = new byte[sizeof(NetworkHeader)]();
int32_t read;
if ((read = recv(c, (char*)header, sizeof(NetworkHeader), 0)) != sizeof(NetworkHeader))
continue; continue;
NetworkHeader net_header(*(NetworkHeader*)(header)); if (m_pollFds.at(i).fd == m_listenerPtr->m_socket->GetNativeSocket())
byte *buffer = new byte[net_header.Size]();
read = recv(c, (char*)buffer, net_header.Size - 4, 0);
if ((read) == net_header.Size - 4)
{ {
NetworkMessage msg; TcpClient* c = m_listenerPtr->AcceptClient();
msg.DeserializeWithoutHeader(buffer, net_header.Size); if (c)
{
shared_ptr<TcpConnection> connection = make_shared<TcpConnection>(c);
AddClient(connection);
break;
}
}
else // not the listening socket
{
SOCKET c = m_pollFds.at(i).fd;
if (msg.GetTag() == (uint32_t)InternalTags::Disconnect) byte* header = new byte[sizeof(NetworkHeader)]();
// i? or i+1
m_pollFds.erase(m_pollFds.begin() + i);
// put this in a separate thread int32_t read;
HandleMessage(msg); if ((read = recv(c, (char*)header, sizeof(NetworkHeader), 0)) != sizeof(NetworkHeader))
continue;
NetworkHeader net_header(*(NetworkHeader*)(header));
byte* buffer = new byte[net_header.Size]();
read = recv(c, (char*)buffer, net_header.Size - 4, 0);
if ((read) == net_header.Size - 4)
{
NetworkMessage msg;
msg.DeserializeWithoutHeader(buffer, net_header.Size);
// put this in a separate thread
HandleMessage(msg);
if (msg.GetTag() == (uint32_t)InternalTags::Disconnect)
{
for (size_t k = 0; k < m_list.size(); i++)
{
if (m_list[k]->m_id == msg.GetSenderID())
{
std::shared_ptr<TcpConnection> c = m_list[k];
c->m_client->Close();
m_list.erase(m_list.begin() + k);
}
}
m_pollFds.erase(m_pollFds.begin() + i);
}
}
} }
} }
} }
catch (void*) {}
} }
void TcpConnectionHandler::HandleMessage(const NetworkMessage &msg) void TcpConnectionHandler::HandleMessage(const NetworkMessage &msg)

View File

@ -11,31 +11,65 @@
class Plugin : public std::net::Plugin class Plugin : public std::net::Plugin
{ {
virtual void HandleMessage(const std::net::NetworkMessage& message) override
{
std::cout << "asd" << std::endl;
}
}; };
void dsfg(uint32_t, std::net::DistributionMode, uint32_t, uint32_t, void*)
{
}
std::net::TcpConnection *con = new std::net::TcpConnection();
void onData(uint32_t, std::net::DistributionMode, uint32_t, uint32_t, void*)
{
std::cout << "ondata" << std::endl;
}
void onDisconnect(std::string)
{
std::cout << "ondisconnect" << std::endl;
}
void onconnect()
{
std::cout << "onconnect" << std::endl;
con->Disconnect();
}
int main() int main()
{ {
std::net::Initialize(); std::net::Server *server = new std::net::Server(1);
std::net::Server server(1); //server.AddPlugin(new Plugin());
server.AddPlugin(new Plugin()); server->Start();
server.Start();
std::net::TcpConnection con;
con.Connect(std::net::IPAddress("127.0.0.1"));
bool sent = false; //con->Connect(std::net::IPAddress("127.0.0.1"));
while (con) // 8% of my cpu //bool sent = false;
//con += onData;
//con += onDisconnect;
//con += onconnect;
/*while (con.IsConnected) // 8% of my cpu
{ {
con.ReceiveData(); con.ReceiveData();
if (!sent) if (!sent)
{ {
sent = true; sent = true;
con.SendMessage(std::net::DistributionMode::Server, 0, 1); //con.SendMessage(std::net::DistributionMode::AllAndServer, 0, 1);
} }
}*/
try
{
server->Stop();
delete server;
} }
catch (void* e)
{
std::cout << "asd" << std::endl;
}
getchar();
} }

View File

@ -3,6 +3,8 @@
#include "VoidNet_LL/Response.hpp" #include "VoidNet_LL/Response.hpp"
#include "VoidNet_LL/Parse.hpp" #include "VoidNet_LL/Parse.hpp"
#include <stdexcept>
namespace std::net namespace std::net
{ {
static ParseResult<HttpStatus> ParseStatus(const char* str) static ParseResult<HttpStatus> ParseStatus(const char* str)

View File

@ -1,10 +1,13 @@
#include "VoidNet_LL/Socket.hpp" #include "VoidNet_LL/Socket.hpp"
#include "VoidNet_LL/IPAddress.hpp" #include "VoidNet_LL/IPAddress.hpp"
#include <VoidNet_LL\Init.hpp>
namespace std::net namespace std::net
{ {
void Socket::init() void Socket::init()
{ {
Initialize();
if (GetSocketType() == SocketType::Unknown) if (GetSocketType() == SocketType::Unknown)
throw invalid_argument("Unknown socket type"); throw invalid_argument("Unknown socket type");
@ -55,11 +58,11 @@ namespace std::net
bool Socket::Connect(const IPAddress& addr) bool Socket::Connect(const IPAddress& addr)
{ {
sockaddr_in addr_in = addr.ToCAddr(); sockaddr_in addr_in = addr.ToCAddr();
int32_t Return = connect(m_socket, (sockaddr*)&addr_in, sizeof(sockaddr_in)); int32_t retValue = connect(m_socket, (sockaddr*)&addr_in, sizeof(sockaddr_in));
SocketErrors Error = TranslateErrorCode(Return); SocketErrors error = TranslateErrorCode(retValue);
// "would block" is not an error // "would block" is not an error
return ((Error == SocketErrors::SE_NO_ERROR) || (Error == SocketErrors::SE_EWOULDBLOCK)); return ((error == SocketErrors::SE_NO_ERROR) || (error == SocketErrors::SE_EWOULDBLOCK));
} }
bool Socket::WaitForPendingConnection(bool& hasPendingConnection, chrono::milliseconds t) bool Socket::WaitForPendingConnection(bool& hasPendingConnection, chrono::milliseconds t)
@ -421,6 +424,7 @@ namespace std::net
return SocketErrors::SE_EINVAL; return SocketErrors::SE_EINVAL;
#else #else
code = WSAGetLastError();
// handle the generic -1 error // handle the generic -1 error
if (code == SOCKET_ERROR) if (code == SOCKET_ERROR)
{ {