High Level API - Fully Functional

This commit is contained in:
TheDoctor
2019-10-12 19:09:55 +01:00
parent 6db271bd7e
commit ad5ff5e53e
24 changed files with 1286 additions and 4 deletions

View File

@ -0,0 +1,52 @@
#include "MessageQueue.hpp"
namespace std::net
{
void MessageQueue::EnqueueMessageToSend(const NetworkMessage & msg)
{
m_sendMutex.lock();
m_messagesToSend.emplace_back(msg);
m_sendMutex.unlock();
}
void MessageQueue::EnqueueMessageReceived(const NetworkMessage & msg)
{
m_receivedMutex.lock();
DataReceivedEvent ev(msg);
m_dataReceivedEvents.push_back(ev);
m_receivedMutex.unlock();
}
void MessageQueue::EnqueueDisconnection(const NetworkMessage & msg)
{
m_disconnectMutex.lock();
//std::unique_ptr<DisconnectedEvent> ev(((NetworkMessage)msg).GetData<DisconnectedEvent>());
//m_disconnectedEvents.push_back(*(ev.get()));
m_disconnectMutex.unlock();
}
void MessageQueue::EnqueueConnection(const NetworkMessage & msg)
{
m_connectionMutex.lock();
NewConnectionEvent ev(msg.GetSenderID(), msg.GetData<void>());
m_connectionEvents.push_back(ev);
m_connectionMutex.unlock();
}
NetworkMessage MessageQueue::DequeueMessageToSend()
{
m_sendMutex.lock();
NetworkMessage msg = m_messagesToSend.front();
m_messagesToSend.erase(m_messagesToSend.begin() + 1);
m_sendMutex.unlock();
return msg;
}
uint32_t MessageQueue::SendSize()
{
m_sendMutex.lock();
uint32_t size = m_messagesToSend.size();
m_sendMutex.unlock();
return size;
}
}

View File

@ -0,0 +1,63 @@
#include "NetworkMessage.hpp"
namespace std::net
{
uint32_t NetworkMessage::GetSenderID() const
{
return m_senderID;
}
DistributionMode NetworkMessage::GetDistributionMode() const
{
return m_distributionMode;
}
uint32_t NetworkMessage::GetDestinationID() const
{
return m_destinationID;
}
uint32_t NetworkMessage::GetTag() const
{
return m_tag;
}
uint8_t *NetworkMessage::SerializeData(uint32_t &size)
{
int32_t sizeOfNetHeader = sizeof(NetworkHeader);
NetworkHeader header;
header.Size = 13 + sizeOfNetHeader + m_dataSize;
uint8_t *bytes = new uint8_t[header.Size];
memcpy(bytes, &header, sizeOfNetHeader);
uint8_t *sender = BitConverter::ToBytes<uint32_t>(m_senderID); // 4
uint8_t *destination = BitConverter::ToBytes<uint32_t>(m_destinationID); // 4
uint8_t *tag = BitConverter::ToBytes<uint32_t>(m_tag); // 4
memcpy(bytes + sizeOfNetHeader, sender, 4);
bytes[sizeOfNetHeader + 4] = (uint8_t)m_distributionMode;
memcpy(bytes + sizeOfNetHeader + 5, destination, 4);
memcpy(bytes + sizeOfNetHeader + 9, tag, 4);
memcpy(bytes + 13 + sizeOfNetHeader, m_data, m_dataSize);
size = header.Size;
return bytes;
}
void NetworkMessage::Deserialize(uint8_t *data, uint32_t size)
{
NetworkHeader buffer;
uint32_t sizeOfNetHeader = sizeof(NetworkHeader);
memcpy(&(buffer), data, sizeOfNetHeader);
memcpy(&(m_senderID), data + 4 + sizeOfNetHeader, 4);
m_distributionMode = (DistributionMode)data[8 + sizeOfNetHeader];
memcpy(&(m_destinationID), data + 5 + sizeOfNetHeader, 4);
memcpy(&(m_tag), data + 9 + sizeOfNetHeader, 4);
m_data = data + 13 + sizeOfNetHeader;
}
}

25
src/HLAPI/Server.cpp Normal file
View File

@ -0,0 +1,25 @@
#include "Server.hpp"
#include "MessageQueue.hpp"
#include "TcpServer.hpp"
//#include "UdpServer.hpp
namespace std::net
{
Server::Server(uint32_t max_connections, uint16_t port)
{
m_tcpServer = std::make_shared<std::net::TcpServer>(max_connections, port);
m_queue = std::make_shared<MessageQueue>();
//m_tcpServer->m_connectionHandler->m_queue = m_queue;
}
void Server::Start()
{
m_tcpServer->Start();
}
void Server::Stop()
{
m_tcpServer->Stop();
}
}

View File

@ -0,0 +1,72 @@
#include "TcpConnection.hpp"
#include "InternalTags.hpp"
namespace std::net
{
TcpConnection::TcpConnection(TcpClient * client)
: m_client(client)
{
}
std::shared_ptr<TcpClient> TcpConnection::GetClient()
{
return m_client;
}
uint32_t TcpConnection::GetID()
{
return m_id;
}
void TcpConnection::SetID(uint32_t id)
{
m_id = id;
}
bool TcpConnection::sendMessage(NetworkMessage & msg)
{
uint32_t size;
uint8_t *data = msg.SerializeData(size);
int32_t sent;
return m_client->Send(data, size, sent);
}
void TcpConnection::ReceiveData()
{
std::unique_ptr<uint8_t> header(new uint8_t[sizeof(NetworkHeader*)]());
int32_t read;
if (!m_client->Recv(header.get(), sizeof(NetworkHeader*), read))
return;
if (read == sizeof(NetworkHeader*))
{
std::unique_ptr<NetworkHeader> net_header((NetworkHeader*)header.get());
std::unique_ptr<uint8_t> buffer(new uint8_t[net_header->Size]());
int32_t read;
if (!m_client->Recv(buffer.get(), net_header->Size, read))
{
if (read != net_header->Size)
return; // wrong message?
NetworkMessage msg;
msg.Deserialize(buffer.get(), net_header->Size);
if (msg.GetTag() == (uint32_t)InternalTags::Disconnect)
{
//DisconnectedEvent(msg.m_senderID, );
}
else if (msg.GetTag() == (uint32_t)InternalTags::Connect)
NewConnectionEvent(msg.GetSenderID(), msg.GetData<void>());
else
DataReceivedEvent(msg.GetSenderID(), msg.GetDistributionMode(), msg.GetDestinationID(), msg.GetTag(), msg.GetData<void>());
}
}
else // wrong message
{
return;
}
}
}

View File

@ -0,0 +1,306 @@
#include "TcpConnectionHandler.hpp"
#include "DisconnectedEvent.hpp"
#include "NewConnectionEvent.hpp"
#include "InternalTags.hpp"
#include "NetworkMessage.hpp"
#include "MessageQueue.hpp"
#include "TcpConnection.hpp"
#include "TcpListener.hpp"
#include <chrono>
namespace std::net
{
TcpConnectionHandler::TcpConnectionHandler(std::shared_ptr<TcpListener> listener_ptr)
: m_run(false)
, m_listenerPtr(listener_ptr)
, m_queue(new MessageQueue())
{
}
TcpConnectionHandler::~TcpConnectionHandler()
{
m_run.exchange(false);
}
void TcpConnectionHandler::Start()
{
m_run.exchange(true);
std::thread receive_thread(&TcpConnectionHandler::HandleReceiveMsgAndConnsThreaded, this);
m_receiveThread.swap(receive_thread);
//std::thread send_thread(&TcpConnectionHandler::HandleSendThreaded, this);
//m_sendThread.swap(send_thread);
}
void TcpConnectionHandler::Stop()
{
m_run.exchange(false);
}
void TcpConnectionHandler::AddClient(std::shared_ptr<TcpConnection> &c)
{
uint32_t id = GetAvailableID();
if (id == -1)
{
// this can be handled just by the server
// what if the server owner wants to know if a user wanted to join but couldnt
DisconnectedEvent disconnected_event(id, "Server Full", -1);
std::shared_ptr<TcpClient> client = c->GetClient();
/*int32_t size = 0;
uint8_t *buffer = disconnected_event.Serialize(size);
int32_t sent = 0;
client->Send(buffer, size, sent);*/
client->Close();
}
c->SetID(id);
uint32_t *id_ptr = &id;
NetworkMessage msg(0, DistributionMode::ID, id, (uint32_t)InternalTags::AssignID, id_ptr, sizeof(id_ptr));
std::this_thread::sleep_for(std::chrono::milliseconds(50));
uint32_t serialized_size;
uint8_t *serialized_data = msg.SerializeData<uint32_t>(serialized_size);
int32_t sent;
if (!c->GetClient()->Send(serialized_data, serialized_size, sent))
{
//couldnt send
return;
}
m_listMutex.lock();
m_list.push_back(c);
m_listMutex.unlock();
m_queue->EnqueueConnection(msg);
}
uint32_t TcpConnectionHandler::GetAvailableID()
{
for (int i = 1; i <= m_maxConnections; i++)
{
bool flag = true;
m_listMutex.lock();
for (int k = 0; k < m_list.size(); k++)
{
if (m_list.at(k)->GetID() == i)
flag = false;
}
m_listMutex.unlock();
if (flag)
return i;
}
//throw OutOfRangeException("Out of IDs to allocate - clients = max connections", "NewConnectionEventPool");
return -1;
}
void TcpConnectionHandler::SetMaxConnections(uint32_t max_connections)
{
m_maxConnections = max_connections;
}
void TcpConnectionHandler::HandleReceiveMsgAndConns()
{
// https://www.ibm.com/support/knowledgecenter/en/ssw_i5_54/rzab6/poll.htm
std::vector<pollfd> poll_fds;
pollfd master_fd;
master_fd.fd = m_listenerPtr->m_socket->GetNativeSocket();
master_fd.events = POLLRDNORM;
poll_fds.emplace_back(master_fd);
for (size_t i = 0; i < m_list.size(); i++)
{
pollfd client_fd;
client_fd.fd = m_list.at(i)->m_client->m_socket->GetNativeSocket();
client_fd.events = POLLRDNORM;
poll_fds.emplace_back(client_fd);
}
int res = poll(poll_fds.data(), poll_fds.size(), -1);
if (res < 0)
{
//poll error
}
//should never timeout because its infinite (negative)
//if (res == 0)
//{
//timeout
//}
for (int i = 0; i < poll_fds.size(); i++)
{
if (poll_fds.at(i).revents == 0)
continue;
if (poll_fds[i].revents != POLLRDNORM)
{
continue;
}
if (poll_fds.at(i).fd == m_listenerPtr->m_socket->GetNativeSocket())
{
TcpClient *c = m_listenerPtr->AcceptClient();
if (c)
{
std::shared_ptr<TcpConnection> connection = std::make_shared<TcpConnection>(c);
AddClient(connection);
break;
}
}
else // not the listening socket
{
SOCKET c = poll_fds.at(i).fd;
std::unique_ptr<uint8_t> header(new uint8_t[sizeof(NetworkHeader*)]());
int32_t read;
if ((read = recv(c, (char*)header.get(), sizeof(NetworkHeader*), 0)) != sizeof(NetworkHeader*))
continue;
std::unique_ptr<NetworkHeader> net_header((NetworkHeader*)header.get());
std::unique_ptr<uint8_t> buffer(new uint8_t[net_header->Size]());
if ((read = recv(c, (char*)buffer.get(), net_header->Size, 0)) == net_header->Size)
{
NetworkMessage msg;
msg.Deserialize(buffer.get(), net_header->Size);
if (msg.GetTag() == (uint32_t)InternalTags::Disconnect)
m_queue->EnqueueDisconnection(msg);
else if (msg.GetTag() == (uint32_t)InternalTags::Connect)
m_queue->EnqueueConnection(msg);
else
m_queue->EnqueueMessageReceived(msg);
}
else
continue;
}
}
}
void TcpConnectionHandler::HandleSend()
{
if (m_queue->SendSize() > 0)
{
NetworkMessage msg = m_queue->DequeueMessageToSend();
uint32_t size;
std::unique_ptr<uint8_t> data(msg.SerializeData(size));
if (msg.GetDistributionMode() == DistributionMode::Others)
{
m_listMutex.lock();
for (int i = 0; i < m_list.size(); i++)
{
std::shared_ptr<TcpConnection> c = m_list.at(i);
if (c->GetID() != msg.GetSenderID())
{
int32_t sent;
if (!c->GetClient()->Send(data.get(), size, sent))
{
// it failed - retry? or just disconnect right in the first try
}
}
}
m_listMutex.unlock();
}
else if (msg.GetDistributionMode() == DistributionMode::OthersAndServer)
{
m_listMutex.lock();
for (int i = 0; i < m_list.size(); i++)
{
std::shared_ptr<TcpConnection> c = m_list.at(i);
if (c->GetID() != msg.GetSenderID())
{
int32_t sent;
if (!c->GetClient()->Send(data.get(), size, sent))
{
// it failed - retry? or just disconnect right in the first try
}
}
}
m_listMutex.unlock();
//handle to plugins too
}
else if (msg.GetDistributionMode() == DistributionMode::ID)
{
m_listMutex.lock();
for (int i = 0; i < m_list.size(); i++)
{
std::shared_ptr<TcpConnection> c = m_list.at(i);
if (c->GetID() == msg.GetSenderID())
{
int32_t sent;
if (!c->GetClient()->Send(data.get(), size, sent))
{
// it failed - retry? or just disconnect right in the first try
}
}
}
m_listMutex.unlock();
}
else if (msg.GetDistributionMode() == DistributionMode::All)
{
m_listMutex.lock();
for (int i = 0; i < m_list.size(); i++)
{
std::shared_ptr<TcpConnection> c = m_list.at(i);
int32_t sent;
if (!c->GetClient()->Send(data.get(), size, sent))
{
// it failed - retry? or just disconnect right in the first try
}
}
m_listMutex.unlock();
}
else if (msg.GetDistributionMode() == DistributionMode::AllAndMe)
{
m_listMutex.lock();
for (int i = 0; i < m_list.size(); i++)
{
std::shared_ptr<TcpConnection> c = m_list.at(i);
int32_t sent;
if (!c->GetClient()->Send(data.get(), size, sent))
{
// it failed - retry? or just disconnect right in the first try
}
}
m_listMutex.unlock();
//handle to plugins too
}
else if (msg.GetDistributionMode() == DistributionMode::Server)
{
//handle just in plugins
}
}
}
void TcpConnectionHandler::HandleReceiveMsgAndConnsThreaded()
{
while (m_run.load())
{
HandleReceiveMsgAndConns();
std::this_thread::sleep_for(std::chrono::milliseconds(5));
}
}
void TcpConnectionHandler::HandleSendThreaded()
{
while (m_run.load())
{
HandleSend();
std::this_thread::sleep_for(std::chrono::milliseconds(5));
}
}
}

33
src/HLAPI/TcpServer.cpp Normal file
View File

@ -0,0 +1,33 @@
#include "TcpServer.hpp"
#include "TcpConnection.hpp"
#include "TcpSocketBuilder.hpp"
#include "TcpClient.hpp"
#include "TcpConnectionHandler.hpp"
namespace std::net
{
TcpServer::TcpServer(uint32_t max_connections, uint16_t port)
: m_maxConnections(max_connections)
, m_port(port)
, m_run(false)
{
if (max_connections == 0 || port == 0)
throw std::invalid_argument("TcpServer::TcpServer()");
listener = std::shared_ptr<TcpListener>(TcpSocketBuilder().AsReusable().Bind(IPAddress(0, 0, 0, 0, port)).Listening().BuildListener().release());
m_connectionHandler = std::make_shared<std::net::TcpConnectionHandler>(listener);
m_connectionHandler->SetMaxConnections(max_connections);
}
void TcpServer::Start()
{
m_run = true;
m_connectionHandler->Start();
}
void TcpServer::Stop()
{
m_run.exchange(false);
}
}

58
src/HLAPI/main.cpp Normal file
View File

@ -0,0 +1,58 @@
#include "Init.hpp"
#include "Server.hpp"
#include "BitConverter.hpp"
#include "TcpClient.hpp"
#include "InternalTags.hpp"
#include "NetworkMessage.hpp"
#include <iostream>
int main()
{
std::net::Initialize();
std::net::Server server(100);
server.Start();
std::net::TcpClient client;
std::net::IPAddress ip("127.0.0.1");
client.Connect(ip);
std::net::TcpClient client2;
client2.Connect(ip);
while (true)
{
uint32_t data_size;
while (client.HasPendingData(data_size))
{
std::net::NetworkMessage message;
uint8_t *bytes = new uint8_t[data_size]();
int32_t read;
client.Recv(bytes, data_size, read);
message.Deserialize(bytes, data_size);
uint32_t id = std::BitConverter::FromBytes<uint32_t>((uint8_t*)(message.GetData<void>()));
if (message.GetTag() == (uint32_t)InternalTags::AssignID)
std::cout << id << std::endl;
}
while (client2.HasPendingData(data_size))
{
std::net::NetworkMessage message2;
uint8_t* bytes2 = new uint8_t[data_size]();
int32_t read2;
client2.Recv(bytes2, data_size, read2);
message2.Deserialize(bytes2, data_size);
uint32_t id2 = std::BitConverter::FromBytes<uint32_t>((uint8_t*)(message2.GetData<void>()));
if (message2.GetTag() == (uint32_t)InternalTags::AssignID)
std::cout << id2 << std::endl;
}
}
}