From d74051f57e8da9ac353cc022348ee7b468613cf8 Mon Sep 17 00:00:00 2001 From: Kyle Schwarz Date: Mon, 12 Jan 2026 13:03:55 -0500 Subject: [PATCH] Driver: Servd: Refactor to TCP --- include/icsneo/platform/servd.h | 11 +- include/icsneo/platform/socket.h | 433 ++++++++++++++++--------------- platform/servd.cpp | 184 +++++-------- 3 files changed, 289 insertions(+), 339 deletions(-) diff --git a/include/icsneo/platform/servd.h b/include/icsneo/platform/servd.h index 3a80b38..37de537 100644 --- a/include/icsneo/platform/servd.h +++ b/include/icsneo/platform/servd.h @@ -18,27 +18,24 @@ class Servd : public Driver { public: static void Find(std::vector& foundDevices); static bool Enabled(); - Servd(const device_eventhandler_t& err, neodevice_t& forDevice, const std::unordered_set& availableDrivers); + Servd(const device_eventhandler_t& err, neodevice_t& forDevice, const Address& address); ~Servd() override; bool open() override; bool isOpen() override; bool close() override; - bool faa(const std::string& key, int32_t inc, int32_t& orig); bool enableCommunication(bool enable, bool& sendMsg) override; driver_finder_t getFinder() override { return Servd::Find; } private: - void alive(); - void read(Address&& address); - void write(Address&& address); + void read(); + void write(); neodevice_t& device; - std::thread aliveThread; // makes sure the client and server are healthy std::thread writeThread; std::thread readThread; Socket messageSocket; bool opened = false; bool comEnabled = false; - std::string driver; + std::unique_ptr dataSocket; }; } diff --git a/include/icsneo/platform/socket.h b/include/icsneo/platform/socket.h index a4530c7..c3564b8 100644 --- a/include/icsneo/platform/socket.h +++ b/include/icsneo/platform/socket.h @@ -1,210 +1,223 @@ -#ifndef __SOCKET_H_ -#define __SOCKET_H_ - -#ifdef __cplusplus - -#ifdef _WIN32 -#define WIN32_LEAN_AND_MEAN -#define NOMINMAX -#include -#include -#include -#else -#include -#include -#include -#include -#include -#include -#endif - -#include -#include - -namespace icsneo { - -#ifdef _WIN32 -class WSA { -public: - WSA() { - // TODO: add error checking - WSAStartup(MAKEWORD(2, 2), &wsaData); - } - ~WSA() { - WSACleanup(); - } -private: - WSADATA wsaData; -}; -#endif - -class Address { -public: - Address() = default; - Address(const char* ip, uint16_t port) - : _ip(ip), _port(port) - { - _sockaddr.sin_family = AF_INET; - inet_pton(AF_INET, ip, &_sockaddr.sin_addr); - _sockaddr.sin_port = htons(port); - } - Address(sockaddr_in& sockaddr) - : _sockaddr(sockaddr) - { - char cip[INET_ADDRSTRLEN]; - inet_ntop(AF_INET, &sockaddr.sin_addr, cip, sizeof(cip)); - _ip = cip; - _port = ntohs(sockaddr.sin_port); - } - const std::string& ip() const { return _ip; } - const uint16_t& port() const { return _port; } - const sockaddr_in& sockaddr() const { return _sockaddr; } -private: - std::string _ip; - uint16_t _port; - sockaddr_in _sockaddr; -}; - -class Socket { -public: - #ifdef _WIN32 - using SocketHandleType = SOCKET; - #else - using SocketHandleType = int; - #endif - - Socket() { - #ifdef _WIN32 - static WSA wsa; - #endif - mFD = socket(AF_INET, SOCK_DGRAM, 0); - } - - ~Socket() { - #ifdef _WIN32 - closesocket(mFD); - #else - close(mFD); - #endif - } - - bool set_reuse(bool value) { - int ival = value; - return ::setsockopt(mFD, SOL_SOCKET, SO_REUSEADDR, (const char*)&ival, sizeof(ival)) != -1; - } - - bool set_nonblocking() { - #ifdef _WIN32 - u_long nonblock = 1; - return ioctlsocket(mFD, FIONBIO, &nonblock) != SOCKET_ERROR; - #else - return fcntl(mFD, F_SETFL, fcntl(mFD, F_GETFL, 0) | O_NONBLOCK) != -1; - #endif - } - - bool bind(const Address& at) { - return ::bind(mFD, (sockaddr*)&at.sockaddr(), sizeof(sockaddr_in)) != -1; - } - - bool poll(const std::chrono::milliseconds& timeout, bool& in) { - #ifdef _WIN32 - WSAPOLLFD pfd; - pfd.fd = mFD; - pfd.events = POLLIN; - if (::WSAPoll(&pfd, 1, static_cast(timeout.count())) == SOCKET_ERROR) { - return false; - } - in = pfd.revents & POLLIN; - return true; - #else - struct pollfd pfd; - pfd.fd = mFD; - pfd.events = POLLIN; - pfd.revents = 0; - if (::poll(&pfd, 1, static_cast(timeout.count())) == -1) { - return false; - } - in = pfd.revents & POLLIN; - return true; - #endif - } - - bool sendto(const void* buffer, size_t size, const Address& to) { - size_t totalSent = 0; - do { - const auto sent = ::sendto(mFD, (const char*)buffer, (int)size, 0, (sockaddr*)&to.sockaddr(), sizeof(sockaddr_in)); - if (sent == -1) { - return false; - } - totalSent += sent; - } while (totalSent < size); - return true; - } - - bool recvfrom(void* buffer, size_t& size, Address& from) { - sockaddr_in addr; - socklen_t addLen = sizeof(addr); - const auto read = ::recvfrom(mFD, (char*)buffer, (int)size, 0, (sockaddr*)&addr, &addLen); - if (read == -1) { - return false; - } - size = read; - from = Address(addr); - return true; - } - - bool recv(void* buffer, size_t& size) { - const auto read = ::recv(mFD, (char*)buffer, (int)size, 0); - if (read == -1) { - return false; - } - size = read; - return true; - } - - template - bool transceive(const Address& to, REQ&& request, RES&& response, const std::chrono::milliseconds& timeout) { - if(!sendto(request.data(), request.size(), to)) { - return false; - } - bool hasData; - if(!poll(timeout, hasData)) { - return false; - } - if(!hasData) { - return false; - } - size_t responseSize = response.size(); - if(!recv(response.data(), responseSize)) { - return false; - } - response.resize(responseSize); - return true; - } - - bool address(Address& address) const { - sockaddr_in sin; - socklen_t len = sizeof(sin); - getsockname(mFD, (sockaddr*)&sin, &len); - address = Address(sin); - return true; - } - - bool join_multicast(const std::string& interfaceIP, const std::string& multicastIP) { - ip_mreq mreq; - inet_pton(AF_INET, interfaceIP.c_str(), &mreq.imr_interface); - inet_pton(AF_INET, multicastIP.c_str(), &mreq.imr_multiaddr); - return setsockopt(mFD, IPPROTO_IP, IP_ADD_MEMBERSHIP, (const char*)&mreq, sizeof(mreq)) == 0; - } - - operator bool() const { return mFD != -1; } - operator SocketHandleType() const { return mFD; } -private: - SocketHandleType mFD; -}; - -} // namespace icsneo - -#endif // __cplusplus - -#endif // __SOCKET_H_ +#ifndef __SOCKET_H_ +#define __SOCKET_H_ + +#ifdef __cplusplus + +#ifdef _WIN32 +#define WIN32_LEAN_AND_MEAN +#define NOMINMAX +#include +#include +#include +#else +#include +#include +#include +#include +#include +#include +#endif + +#include +#include + +namespace icsneo { + +#ifdef _WIN32 +class WSA { +public: + WSA() { + // TODO: add error checking + WSAStartup(MAKEWORD(2, 2), &wsaData); + } + ~WSA() { + WSACleanup(); + } +private: + WSADATA wsaData; +}; +#endif + +class Address { +public: + Address() = default; + Address(const char* ip, uint16_t port) + : _ip(ip), _port(port) + { + _sockaddr.sin_family = AF_INET; + inet_pton(AF_INET, ip, &_sockaddr.sin_addr); + _sockaddr.sin_port = htons(port); + } + Address(sockaddr_in& sockaddr) + : _sockaddr(sockaddr) + { + char cip[INET_ADDRSTRLEN]; + inet_ntop(AF_INET, &sockaddr.sin_addr, cip, sizeof(cip)); + _ip = cip; + _port = ntohs(sockaddr.sin_port); + } + const std::string& ip() const { return _ip; } + const uint16_t& port() const { return _port; } + const sockaddr_in& sockaddr() const { return _sockaddr; } +private: + std::string _ip; + uint16_t _port; + sockaddr_in _sockaddr; +}; + +class Socket { +public: + #ifdef _WIN32 + using SocketHandleType = SOCKET; + #else + using SocketHandleType = int; + #endif + + template + Socket(Args&&... args) { + #ifdef _WIN32 + static WSA wsa; + #endif + mFD = socket(std::forward(args)...); + } + + ~Socket() { + #ifdef _WIN32 + closesocket(mFD); + #else + close(mFD); + #endif + } + + bool set_reuse(bool value) { + int ival = value; + return ::setsockopt(mFD, SOL_SOCKET, SO_REUSEADDR, (const char*)&ival, sizeof(ival)) != -1; + } + + bool set_nonblocking() { + #ifdef _WIN32 + u_long nonblock = 1; + return ioctlsocket(mFD, FIONBIO, &nonblock) != SOCKET_ERROR; + #else + return fcntl(mFD, F_SETFL, fcntl(mFD, F_GETFL, 0) | O_NONBLOCK) != -1; + #endif + } + + bool connect(const Address& to) { + return ::connect(mFD, (sockaddr*)&to.sockaddr(), sizeof(sockaddr_in)) != -1; + } + + bool bind(const Address& at) { + return ::bind(mFD, (sockaddr*)&at.sockaddr(), sizeof(sockaddr_in)) != -1; + } + + bool poll(const std::chrono::milliseconds& timeout, bool& in) { + #ifdef _WIN32 + WSAPOLLFD pfd; + pfd.fd = mFD; + pfd.events = POLLIN; + if (::WSAPoll(&pfd, 1, static_cast(timeout.count())) == SOCKET_ERROR) { + return false; + } + in = pfd.revents & POLLIN; + return true; + #else + struct pollfd pfd; + pfd.fd = mFD; + pfd.events = POLLIN; + pfd.revents = 0; + if (::poll(&pfd, 1, static_cast(timeout.count())) == -1) { + return false; + } + in = pfd.revents & POLLIN; + return true; + #endif + } + + bool sendto(const void* buffer, size_t size, const Address& to) { + size_t totalSent = 0; + do { + const auto sent = ::sendto(mFD, (const char*)buffer, (int)size, 0, (sockaddr*)&to.sockaddr(), sizeof(sockaddr_in)); + if (sent == -1) { + return false; + } + totalSent += sent; + } while (totalSent < size); + return true; + } + + bool send(const void* buffer, size_t size) { + auto sent = ::send(mFD, (const char*)buffer, (int)size, 0); + if(sent == -1) { + return false; + } + return (size_t)sent == size; + } + + bool recvfrom(void* buffer, size_t& size, Address& from) { + sockaddr_in addr; + socklen_t addLen = sizeof(addr); + const auto read = ::recvfrom(mFD, (char*)buffer, (int)size, 0, (sockaddr*)&addr, &addLen); + if (read == -1) { + return false; + } + size = read; + from = Address(addr); + return true; + } + + bool recv(void* buffer, size_t& size) { + const auto read = ::recv(mFD, (char*)buffer, (int)size, 0); + if (read == -1) { + return false; + } + size = read; + return true; + } + + template + bool transceive(REQ&& request, RES&& response, const std::chrono::milliseconds& timeout) { + if(!send(request.data(), request.size())) { + return false; + } + bool hasData; + if(!poll(timeout, hasData)) { + return false; + } + if(!hasData) { + return false; + } + size_t responseSize = response.size(); + if(!recv(response.data(), responseSize)) { + return false; + } + response.resize(responseSize); + return true; + } + + bool address(Address& address) const { + sockaddr_in sin; + socklen_t len = sizeof(sin); + getsockname(mFD, (sockaddr*)&sin, &len); + address = Address(sin); + return true; + } + + bool join_multicast(const std::string& interfaceIP, const std::string& multicastIP) { + ip_mreq mreq; + inet_pton(AF_INET, interfaceIP.c_str(), &mreq.imr_interface); + inet_pton(AF_INET, multicastIP.c_str(), &mreq.imr_multiaddr); + return setsockopt(mFD, IPPROTO_IP, IP_ADD_MEMBERSHIP, (const char*)&mreq, sizeof(mreq)) == 0; + } + + operator bool() const { return mFD != -1; } + operator SocketHandleType() const { return mFD; } +private: + SocketHandleType mFD; +}; + +} // namespace icsneo + +#endif // __cplusplus + +#endif // __SOCKET_H_ diff --git a/platform/servd.cpp b/platform/servd.cpp index 91809fa..e97bbb2 100644 --- a/platform/servd.cpp +++ b/platform/servd.cpp @@ -6,7 +6,7 @@ using namespace icsneo; -#define SERVD_VERSION 1 +#define SERVD_VERSION 2 static const Address SERVD_ADDRESS = Address("127.0.0.1", 26741); static const std::string SERVD_VERSION_STR = std::to_string(SERVD_VERSION); @@ -41,20 +41,17 @@ std::vector split(const std::string_view& str, char delim = ' ') { } void Servd::Find(std::vector& found) { - Socket socket; + Socket socket(AF_INET, SOCK_DGRAM, 0); + socket.connect(SERVD_ADDRESS); if(!socket.set_nonblocking()) { EventManager::GetInstance().add(APIEvent::Type::ServdNonblockError, APIEvent::Severity::Error); return; } - if(!socket.bind(Address("127.0.0.1", 0))) { - EventManager::GetInstance().add(APIEvent::Type::ServdBindError, APIEvent::Severity::Error); - return; - } std::string response; response.resize(512); const std::string version_request = SERVD_VERSION_STR + " version"; - if(!socket.transceive(SERVD_ADDRESS, version_request, response, std::chrono::milliseconds(5000))) { + if(!socket.transceive(version_request, response, std::chrono::milliseconds(5000))) { EventManager::GetInstance().add(APIEvent::Type::ServdTransceiveError, APIEvent::Severity::Error); return; } @@ -66,46 +63,39 @@ void Servd::Find(std::vector& found) { response.resize(512); const std::string find_request = SERVD_VERSION_STR + " find"; - if(!socket.transceive(SERVD_ADDRESS, find_request, response, std::chrono::milliseconds(5000))) { + if(!socket.transceive(find_request, response, std::chrono::milliseconds(5000))) { EventManager::GetInstance().add(APIEvent::Type::ServdTransceiveError, APIEvent::Severity::Error); return; } const auto lines = split(response, '\n'); for(auto&& line : lines) { const auto cols = split(line, ' '); - if(cols.size() < 2) { + if(cols.size() < 3) { EventManager::GetInstance().add(APIEvent::Type::ServdInvalidResponseError, APIEvent::Severity::Error); continue; } const auto& serial = cols[0]; - std::unordered_set drivers; - for (size_t i = 1; i < cols.size(); ++i) { - drivers.emplace(cols[i]); + const auto& ip = cols[1]; + uint16_t port = 0; + try { + port = static_cast(std::stoi(cols[2])); + } catch (const std::exception&) { + EventManager::GetInstance().add(APIEvent::Type::ServdInvalidResponseError, APIEvent::Severity::Error); + continue; } + Address address(ip.c_str(), port); auto& newFound = found.emplace_back(); std::copy(serial.begin(), serial.end(), newFound.serial); newFound.makeDriver = [=](device_eventhandler_t err, neodevice_t& forDevice) { - return std::make_unique(err, forDevice, drivers); + return std::make_unique(err, forDevice, address); }; } } -Servd::Servd(const device_eventhandler_t& err, neodevice_t& forDevice, const std::unordered_set& availableDrivers) : - Driver(err), device(forDevice) { +Servd::Servd(const device_eventhandler_t& err, neodevice_t& forDevice, const Address& address) : + Driver(err), device(forDevice), messageSocket(AF_INET, SOCK_DGRAM, 0) { + messageSocket.connect(address); messageSocket.set_nonblocking(); - messageSocket.bind(Address("127.0.0.1", 0)); - if(availableDrivers.count("dxx")) { - driver = "dxx"; // prefer USB over Ethernet - } else if(availableDrivers.count("cab")) { - driver = "cab"; // prefer CAB over TCP - } else if(availableDrivers.count("tcp")) { - driver = "tcp"; - } else if(availableDrivers.count("vcp")) { - driver = "vcp"; - } else { - // just take the first driver - driver = *availableDrivers.begin(); - } } Servd::~Servd() { @@ -113,21 +103,31 @@ Servd::~Servd() { } bool Servd::open() { - const std::string request = SERVD_VERSION_STR + " open " + std::string(device.serial) + " " + driver; + const std::string request = SERVD_VERSION_STR + " open"; std::string response; response.resize(512); - if(!messageSocket.transceive(SERVD_ADDRESS, request, response, std::chrono::milliseconds(5000))) { + if(!messageSocket.transceive(request, response, std::chrono::milliseconds(5000))) { EventManager::GetInstance().add(APIEvent::Type::ServdTransceiveError, APIEvent::Severity::Error); return false; } const auto tokens = split(response); - if(tokens.size() != 4) { + if(tokens.size() != 2) { EventManager::GetInstance().add(APIEvent::Type::ServdInvalidResponseError, APIEvent::Severity::Error); return false; } - aliveThread = std::thread(&Servd::alive, this); - readThread = std::thread(&Servd::read, this, Address{tokens[2].c_str(), (uint16_t)std::stol(tokens[3].c_str())}); - writeThread = std::thread(&Servd::write, this, Address{tokens[0].c_str(), (uint16_t)std::stol(tokens[1].c_str())}); + dataSocket = std::make_unique(AF_INET, SOCK_STREAM, 0); + const auto& ip = tokens[0]; + uint16_t port = 0; + try { + port = static_cast(std::stoi(tokens[1])); + } catch (const std::exception&) { + EventManager::GetInstance().add(APIEvent::Type::ServdInvalidResponseError, APIEvent::Severity::Error); + return false; + } + Address address(ip.c_str(), port); + dataSocket->connect(address); + readThread = std::thread(&Servd::read, this); + writeThread = std::thread(&Servd::write, this); opened = true; return true; } @@ -138,9 +138,6 @@ bool Servd::isOpen() { bool Servd::close() { setIsClosing(true); - if(aliveThread.joinable()) { - aliveThread.join(); - } if(readThread.joinable()) { readThread.join(); } @@ -148,8 +145,16 @@ bool Servd::close() { writeThread.join(); } if(isOpen()) { - const std::string request = SERVD_VERSION_STR + " close " + std::string(device.serial); - messageSocket.sendto(request.data(), request.size(), SERVD_ADDRESS); + Address localAddress; + dataSocket->address(localAddress); + const std::string request = SERVD_VERSION_STR + " close " + localAddress.ip() + " " + std::to_string(localAddress.port()); + std::string response; + response.resize(1); + if(!messageSocket.transceive(request, response, std::chrono::milliseconds(5000))) { + EventManager::GetInstance().add(APIEvent::Type::ServdTransceiveError, APIEvent::Severity::Error); + return false; + } + dataSocket.reset(); } opened = false; setIsClosing(false); @@ -159,13 +164,13 @@ bool Servd::close() { bool Servd::enableCommunication(bool enable, bool& sendMsg) { const std::string serialString(device.serial); { - const std::string request = SERVD_VERSION_STR + " lock " + serialString + " com 1000"; + const std::string request = SERVD_VERSION_STR + " lock com 1000"; std::string response; response.resize(1); bool locked = false; const auto timeout = std::chrono::steady_clock::now() + std::chrono::seconds(1); do { - if(!messageSocket.transceive(SERVD_ADDRESS, request, response, std::chrono::milliseconds(5000))) { + if(!messageSocket.transceive(request, response, std::chrono::milliseconds(5000))) { return false; } locked = response == "1" ? true : false; @@ -181,10 +186,10 @@ bool Servd::enableCommunication(bool enable, bool& sendMsg) { } uint64_t com = 0; { - const std::string request = SERVD_VERSION_STR + " load " + serialString + " com"; + const std::string request = SERVD_VERSION_STR + " load com"; std::string response; response.resize(20); - if(!messageSocket.transceive(SERVD_ADDRESS, request, response, std::chrono::milliseconds(5000))) { + if(!messageSocket.transceive(request, response, std::chrono::milliseconds(5000))) { EventManager::GetInstance().add(APIEvent::Type::ServdTransceiveError, APIEvent::Severity::Error); return false; } @@ -202,16 +207,20 @@ bool Servd::enableCommunication(bool enable, bool& sendMsg) { } if(comEnabled != enable) { com += enable ? 1 : -1; - const std::string request = SERVD_VERSION_STR + " store " + serialString + " com " + std::to_string(com); - if(!messageSocket.sendto(request.data(), request.size(), SERVD_ADDRESS)) { + const std::string request = SERVD_VERSION_STR + " store com " + std::to_string(com); + std::string response; + response.resize(1); + if(!messageSocket.transceive(request, response, std::chrono::milliseconds(5000))) { EventManager::GetInstance().add(APIEvent::Type::ServdSendError, APIEvent::Severity::Error); return false; } } comEnabled = enable; { - const std::string request = SERVD_VERSION_STR + " unlock " + serialString + " com"; - if(!messageSocket.sendto(request.data(), request.size(), SERVD_ADDRESS)) { + const std::string request = SERVD_VERSION_STR + " unlock com"; + std::string response; + response.resize(1); + if(!messageSocket.transceive(request, response, std::chrono::milliseconds(5000))) { EventManager::GetInstance().add(APIEvent::Type::ServdSendError, APIEvent::Severity::Error); return false; } @@ -219,78 +228,11 @@ bool Servd::enableCommunication(bool enable, bool& sendMsg) { return true; } -void Servd::alive() { - Socket socket; - socket.set_nonblocking(); - socket.bind(Address("127.0.0.1", 0)); - const std::string statusRequest = SERVD_VERSION_STR + " status " + std::string(device.serial); - std::string statusResponse; - statusResponse.resize(8); - while(!isDisconnected() && !isClosing()) { - if(!socket.sendto(statusRequest.data(), statusRequest.size(), {"127.0.0.1", 26741})) { - EventManager::GetInstance().add(APIEvent::Type::ServdSendError, APIEvent::Severity::Error); - setIsDisconnected(true); - return; - } - bool hasData; - if(!socket.poll(std::chrono::milliseconds(2000), hasData)) { - EventManager::GetInstance().add(APIEvent::Type::ServdPollError, APIEvent::Severity::Error); - setIsDisconnected(true); - return; - } - if(!hasData) { - EventManager::GetInstance().add(APIEvent::Type::ServdNoDataError, APIEvent::Severity::Error); - setIsDisconnected(true); - return; - } - size_t statusResponseSize = statusResponse.size(); - if(!socket.recv(statusResponse.data(), statusResponseSize)) { - EventManager::GetInstance().add(APIEvent::Type::ServdRecvError, APIEvent::Severity::Error); - setIsDisconnected(true); - return; - } - statusResponse.resize(statusResponseSize); - if(statusRequest == "closed") { - EventManager::GetInstance().add(APIEvent::Type::DeviceDisconnected, APIEvent::Severity::Error); - setIsDisconnected(true); - return; - } - if(statusResponse != "open") { - EventManager::GetInstance().add(APIEvent::Type::ServdInvalidResponseError, APIEvent::Severity::Error); - setIsDisconnected(true); - return; - } - std::this_thread::sleep_for(std::chrono::milliseconds(1000)); - } -} - -void Servd::read(Address&& address) { - Socket socket; - socket.set_nonblocking(); - socket.set_reuse(true); - #ifdef _WIN32 - if(!socket.bind(Address("127.0.0.1", address.port()))) { - EventManager::GetInstance().add(APIEvent::Type::ServdBindError, APIEvent::Severity::Error); - setIsDisconnected(true); - return; - } - #else - if(!socket.bind(Address(address.ip().c_str(), address.port()))) { - EventManager::GetInstance().add(APIEvent::Type::ServdBindError, APIEvent::Severity::Error); - setIsDisconnected(true); - return; - } - #endif - if(!socket.join_multicast("127.0.0.1", address.ip())) { - EventManager::GetInstance().add(APIEvent::Type::ServdJoinMulticastError, APIEvent::Severity::Error); - setIsDisconnected(true); - return; - } - - std::vector buf(65535); +void Servd::read() { + std::vector buf(2 * 1024 * 1024); while(!isDisconnected() && !isClosing()) { bool hasData; - if(!socket.poll(std::chrono::milliseconds(100), hasData)) { + if(!dataSocket->poll(std::chrono::milliseconds(100), hasData)) { EventManager::GetInstance().add(APIEvent::Type::ServdPollError, APIEvent::Severity::Error); setIsDisconnected(true); return; @@ -299,7 +241,7 @@ void Servd::read(Address&& address) { continue; } size_t bufSize = buf.size(); - if(!socket.recv(buf.data(), bufSize)) { + if(!dataSocket->recv(buf.data(), bufSize)) { EventManager::GetInstance().add(APIEvent::Type::ServdRecvError, APIEvent::Severity::Error); setIsDisconnected(true); return; @@ -308,16 +250,14 @@ void Servd::read(Address&& address) { } } -void Servd::write(Address&& address) { - Socket socket; - socket.bind(Address("127.0.0.1", 0)); +void Servd::write() { WriteOperation writeOp; while(!isDisconnected() && !isClosing()) { if(!writeQueue.wait_dequeue_timed(writeOp, std::chrono::milliseconds(100))) { continue; } if(!isClosing()) { - if(!socket.sendto(writeOp.bytes.data(), writeOp.bytes.size(), address)) { + if(!dataSocket->send(writeOp.bytes.data(), writeOp.bytes.size())) { EventManager::GetInstance().add(APIEvent::Type::ServdSendError, APIEvent::Severity::Error); setIsDisconnected(true); return;