Driver: Servd: Refactor to TCP

pull/76/merge
Kyle Schwarz 2026-01-12 13:03:55 -05:00
parent 530a99d264
commit d74051f57e
3 changed files with 289 additions and 339 deletions

View File

@ -18,27 +18,24 @@ class Servd : public Driver {
public:
static void Find(std::vector<FoundDevice>& foundDevices);
static bool Enabled();
Servd(const device_eventhandler_t& err, neodevice_t& forDevice, const std::unordered_set<std::string>& availableDrivers);
Servd(const device_eventhandler_t& err, neodevice_t& forDevice, const Address& address);
~Servd() override;
bool open() override;
bool isOpen() override;
bool close() override;
bool faa(const std::string& key, int32_t inc, int32_t& orig);
bool enableCommunication(bool enable, bool& sendMsg) override;
driver_finder_t getFinder() override { return Servd::Find; }
private:
void alive();
void read(Address&& address);
void write(Address&& address);
void read();
void write();
neodevice_t& device;
std::thread aliveThread; // makes sure the client and server are healthy
std::thread writeThread;
std::thread readThread;
Socket messageSocket;
bool opened = false;
bool comEnabled = false;
std::string driver;
std::unique_ptr<Socket> dataSocket;
};
}

View File

@ -1,210 +1,223 @@
#ifndef __SOCKET_H_
#define __SOCKET_H_
#ifdef __cplusplus
#ifdef _WIN32
#define WIN32_LEAN_AND_MEAN
#define NOMINMAX
#include <windows.h>
#include <winsock2.h>
#include <ws2tcpip.h>
#else
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <fcntl.h>
#include <poll.h>
#endif
#include <string>
#include <chrono>
namespace icsneo {
#ifdef _WIN32
class WSA {
public:
WSA() {
// TODO: add error checking
WSAStartup(MAKEWORD(2, 2), &wsaData);
}
~WSA() {
WSACleanup();
}
private:
WSADATA wsaData;
};
#endif
class Address {
public:
Address() = default;
Address(const char* ip, uint16_t port)
: _ip(ip), _port(port)
{
_sockaddr.sin_family = AF_INET;
inet_pton(AF_INET, ip, &_sockaddr.sin_addr);
_sockaddr.sin_port = htons(port);
}
Address(sockaddr_in& sockaddr)
: _sockaddr(sockaddr)
{
char cip[INET_ADDRSTRLEN];
inet_ntop(AF_INET, &sockaddr.sin_addr, cip, sizeof(cip));
_ip = cip;
_port = ntohs(sockaddr.sin_port);
}
const std::string& ip() const { return _ip; }
const uint16_t& port() const { return _port; }
const sockaddr_in& sockaddr() const { return _sockaddr; }
private:
std::string _ip;
uint16_t _port;
sockaddr_in _sockaddr;
};
class Socket {
public:
#ifdef _WIN32
using SocketHandleType = SOCKET;
#else
using SocketHandleType = int;
#endif
Socket() {
#ifdef _WIN32
static WSA wsa;
#endif
mFD = socket(AF_INET, SOCK_DGRAM, 0);
}
~Socket() {
#ifdef _WIN32
closesocket(mFD);
#else
close(mFD);
#endif
}
bool set_reuse(bool value) {
int ival = value;
return ::setsockopt(mFD, SOL_SOCKET, SO_REUSEADDR, (const char*)&ival, sizeof(ival)) != -1;
}
bool set_nonblocking() {
#ifdef _WIN32
u_long nonblock = 1;
return ioctlsocket(mFD, FIONBIO, &nonblock) != SOCKET_ERROR;
#else
return fcntl(mFD, F_SETFL, fcntl(mFD, F_GETFL, 0) | O_NONBLOCK) != -1;
#endif
}
bool bind(const Address& at) {
return ::bind(mFD, (sockaddr*)&at.sockaddr(), sizeof(sockaddr_in)) != -1;
}
bool poll(const std::chrono::milliseconds& timeout, bool& in) {
#ifdef _WIN32
WSAPOLLFD pfd;
pfd.fd = mFD;
pfd.events = POLLIN;
if (::WSAPoll(&pfd, 1, static_cast<int>(timeout.count())) == SOCKET_ERROR) {
return false;
}
in = pfd.revents & POLLIN;
return true;
#else
struct pollfd pfd;
pfd.fd = mFD;
pfd.events = POLLIN;
pfd.revents = 0;
if (::poll(&pfd, 1, static_cast<int>(timeout.count())) == -1) {
return false;
}
in = pfd.revents & POLLIN;
return true;
#endif
}
bool sendto(const void* buffer, size_t size, const Address& to) {
size_t totalSent = 0;
do {
const auto sent = ::sendto(mFD, (const char*)buffer, (int)size, 0, (sockaddr*)&to.sockaddr(), sizeof(sockaddr_in));
if (sent == -1) {
return false;
}
totalSent += sent;
} while (totalSent < size);
return true;
}
bool recvfrom(void* buffer, size_t& size, Address& from) {
sockaddr_in addr;
socklen_t addLen = sizeof(addr);
const auto read = ::recvfrom(mFD, (char*)buffer, (int)size, 0, (sockaddr*)&addr, &addLen);
if (read == -1) {
return false;
}
size = read;
from = Address(addr);
return true;
}
bool recv(void* buffer, size_t& size) {
const auto read = ::recv(mFD, (char*)buffer, (int)size, 0);
if (read == -1) {
return false;
}
size = read;
return true;
}
template<typename REQ, typename RES>
bool transceive(const Address& to, REQ&& request, RES&& response, const std::chrono::milliseconds& timeout) {
if(!sendto(request.data(), request.size(), to)) {
return false;
}
bool hasData;
if(!poll(timeout, hasData)) {
return false;
}
if(!hasData) {
return false;
}
size_t responseSize = response.size();
if(!recv(response.data(), responseSize)) {
return false;
}
response.resize(responseSize);
return true;
}
bool address(Address& address) const {
sockaddr_in sin;
socklen_t len = sizeof(sin);
getsockname(mFD, (sockaddr*)&sin, &len);
address = Address(sin);
return true;
}
bool join_multicast(const std::string& interfaceIP, const std::string& multicastIP) {
ip_mreq mreq;
inet_pton(AF_INET, interfaceIP.c_str(), &mreq.imr_interface);
inet_pton(AF_INET, multicastIP.c_str(), &mreq.imr_multiaddr);
return setsockopt(mFD, IPPROTO_IP, IP_ADD_MEMBERSHIP, (const char*)&mreq, sizeof(mreq)) == 0;
}
operator bool() const { return mFD != -1; }
operator SocketHandleType() const { return mFD; }
private:
SocketHandleType mFD;
};
} // namespace icsneo
#endif // __cplusplus
#endif // __SOCKET_H_
#ifndef __SOCKET_H_
#define __SOCKET_H_
#ifdef __cplusplus
#ifdef _WIN32
#define WIN32_LEAN_AND_MEAN
#define NOMINMAX
#include <windows.h>
#include <winsock2.h>
#include <ws2tcpip.h>
#else
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <fcntl.h>
#include <poll.h>
#endif
#include <string>
#include <chrono>
namespace icsneo {
#ifdef _WIN32
class WSA {
public:
WSA() {
// TODO: add error checking
WSAStartup(MAKEWORD(2, 2), &wsaData);
}
~WSA() {
WSACleanup();
}
private:
WSADATA wsaData;
};
#endif
class Address {
public:
Address() = default;
Address(const char* ip, uint16_t port)
: _ip(ip), _port(port)
{
_sockaddr.sin_family = AF_INET;
inet_pton(AF_INET, ip, &_sockaddr.sin_addr);
_sockaddr.sin_port = htons(port);
}
Address(sockaddr_in& sockaddr)
: _sockaddr(sockaddr)
{
char cip[INET_ADDRSTRLEN];
inet_ntop(AF_INET, &sockaddr.sin_addr, cip, sizeof(cip));
_ip = cip;
_port = ntohs(sockaddr.sin_port);
}
const std::string& ip() const { return _ip; }
const uint16_t& port() const { return _port; }
const sockaddr_in& sockaddr() const { return _sockaddr; }
private:
std::string _ip;
uint16_t _port;
sockaddr_in _sockaddr;
};
class Socket {
public:
#ifdef _WIN32
using SocketHandleType = SOCKET;
#else
using SocketHandleType = int;
#endif
template<class... Args>
Socket(Args&&... args) {
#ifdef _WIN32
static WSA wsa;
#endif
mFD = socket(std::forward<Args>(args)...);
}
~Socket() {
#ifdef _WIN32
closesocket(mFD);
#else
close(mFD);
#endif
}
bool set_reuse(bool value) {
int ival = value;
return ::setsockopt(mFD, SOL_SOCKET, SO_REUSEADDR, (const char*)&ival, sizeof(ival)) != -1;
}
bool set_nonblocking() {
#ifdef _WIN32
u_long nonblock = 1;
return ioctlsocket(mFD, FIONBIO, &nonblock) != SOCKET_ERROR;
#else
return fcntl(mFD, F_SETFL, fcntl(mFD, F_GETFL, 0) | O_NONBLOCK) != -1;
#endif
}
bool connect(const Address& to) {
return ::connect(mFD, (sockaddr*)&to.sockaddr(), sizeof(sockaddr_in)) != -1;
}
bool bind(const Address& at) {
return ::bind(mFD, (sockaddr*)&at.sockaddr(), sizeof(sockaddr_in)) != -1;
}
bool poll(const std::chrono::milliseconds& timeout, bool& in) {
#ifdef _WIN32
WSAPOLLFD pfd;
pfd.fd = mFD;
pfd.events = POLLIN;
if (::WSAPoll(&pfd, 1, static_cast<int>(timeout.count())) == SOCKET_ERROR) {
return false;
}
in = pfd.revents & POLLIN;
return true;
#else
struct pollfd pfd;
pfd.fd = mFD;
pfd.events = POLLIN;
pfd.revents = 0;
if (::poll(&pfd, 1, static_cast<int>(timeout.count())) == -1) {
return false;
}
in = pfd.revents & POLLIN;
return true;
#endif
}
bool sendto(const void* buffer, size_t size, const Address& to) {
size_t totalSent = 0;
do {
const auto sent = ::sendto(mFD, (const char*)buffer, (int)size, 0, (sockaddr*)&to.sockaddr(), sizeof(sockaddr_in));
if (sent == -1) {
return false;
}
totalSent += sent;
} while (totalSent < size);
return true;
}
bool send(const void* buffer, size_t size) {
auto sent = ::send(mFD, (const char*)buffer, (int)size, 0);
if(sent == -1) {
return false;
}
return (size_t)sent == size;
}
bool recvfrom(void* buffer, size_t& size, Address& from) {
sockaddr_in addr;
socklen_t addLen = sizeof(addr);
const auto read = ::recvfrom(mFD, (char*)buffer, (int)size, 0, (sockaddr*)&addr, &addLen);
if (read == -1) {
return false;
}
size = read;
from = Address(addr);
return true;
}
bool recv(void* buffer, size_t& size) {
const auto read = ::recv(mFD, (char*)buffer, (int)size, 0);
if (read == -1) {
return false;
}
size = read;
return true;
}
template<typename REQ, typename RES>
bool transceive(REQ&& request, RES&& response, const std::chrono::milliseconds& timeout) {
if(!send(request.data(), request.size())) {
return false;
}
bool hasData;
if(!poll(timeout, hasData)) {
return false;
}
if(!hasData) {
return false;
}
size_t responseSize = response.size();
if(!recv(response.data(), responseSize)) {
return false;
}
response.resize(responseSize);
return true;
}
bool address(Address& address) const {
sockaddr_in sin;
socklen_t len = sizeof(sin);
getsockname(mFD, (sockaddr*)&sin, &len);
address = Address(sin);
return true;
}
bool join_multicast(const std::string& interfaceIP, const std::string& multicastIP) {
ip_mreq mreq;
inet_pton(AF_INET, interfaceIP.c_str(), &mreq.imr_interface);
inet_pton(AF_INET, multicastIP.c_str(), &mreq.imr_multiaddr);
return setsockopt(mFD, IPPROTO_IP, IP_ADD_MEMBERSHIP, (const char*)&mreq, sizeof(mreq)) == 0;
}
operator bool() const { return mFD != -1; }
operator SocketHandleType() const { return mFD; }
private:
SocketHandleType mFD;
};
} // namespace icsneo
#endif // __cplusplus
#endif // __SOCKET_H_

View File

@ -6,7 +6,7 @@
using namespace icsneo;
#define SERVD_VERSION 1
#define SERVD_VERSION 2
static const Address SERVD_ADDRESS = Address("127.0.0.1", 26741);
static const std::string SERVD_VERSION_STR = std::to_string(SERVD_VERSION);
@ -41,20 +41,17 @@ std::vector<std::string> split(const std::string_view& str, char delim = ' ') {
}
void Servd::Find(std::vector<FoundDevice>& found) {
Socket socket;
Socket socket(AF_INET, SOCK_DGRAM, 0);
socket.connect(SERVD_ADDRESS);
if(!socket.set_nonblocking()) {
EventManager::GetInstance().add(APIEvent::Type::ServdNonblockError, APIEvent::Severity::Error);
return;
}
if(!socket.bind(Address("127.0.0.1", 0))) {
EventManager::GetInstance().add(APIEvent::Type::ServdBindError, APIEvent::Severity::Error);
return;
}
std::string response;
response.resize(512);
const std::string version_request = SERVD_VERSION_STR + " version";
if(!socket.transceive(SERVD_ADDRESS, version_request, response, std::chrono::milliseconds(5000))) {
if(!socket.transceive(version_request, response, std::chrono::milliseconds(5000))) {
EventManager::GetInstance().add(APIEvent::Type::ServdTransceiveError, APIEvent::Severity::Error);
return;
}
@ -66,46 +63,39 @@ void Servd::Find(std::vector<FoundDevice>& found) {
response.resize(512);
const std::string find_request = SERVD_VERSION_STR + " find";
if(!socket.transceive(SERVD_ADDRESS, find_request, response, std::chrono::milliseconds(5000))) {
if(!socket.transceive(find_request, response, std::chrono::milliseconds(5000))) {
EventManager::GetInstance().add(APIEvent::Type::ServdTransceiveError, APIEvent::Severity::Error);
return;
}
const auto lines = split(response, '\n');
for(auto&& line : lines) {
const auto cols = split(line, ' ');
if(cols.size() < 2) {
if(cols.size() < 3) {
EventManager::GetInstance().add(APIEvent::Type::ServdInvalidResponseError, APIEvent::Severity::Error);
continue;
}
const auto& serial = cols[0];
std::unordered_set<std::string> drivers;
for (size_t i = 1; i < cols.size(); ++i) {
drivers.emplace(cols[i]);
const auto& ip = cols[1];
uint16_t port = 0;
try {
port = static_cast<uint16_t>(std::stoi(cols[2]));
} catch (const std::exception&) {
EventManager::GetInstance().add(APIEvent::Type::ServdInvalidResponseError, APIEvent::Severity::Error);
continue;
}
Address address(ip.c_str(), port);
auto& newFound = found.emplace_back();
std::copy(serial.begin(), serial.end(), newFound.serial);
newFound.makeDriver = [=](device_eventhandler_t err, neodevice_t& forDevice) {
return std::make_unique<Servd>(err, forDevice, drivers);
return std::make_unique<Servd>(err, forDevice, address);
};
}
}
Servd::Servd(const device_eventhandler_t& err, neodevice_t& forDevice, const std::unordered_set<std::string>& availableDrivers) :
Driver(err), device(forDevice) {
Servd::Servd(const device_eventhandler_t& err, neodevice_t& forDevice, const Address& address) :
Driver(err), device(forDevice), messageSocket(AF_INET, SOCK_DGRAM, 0) {
messageSocket.connect(address);
messageSocket.set_nonblocking();
messageSocket.bind(Address("127.0.0.1", 0));
if(availableDrivers.count("dxx")) {
driver = "dxx"; // prefer USB over Ethernet
} else if(availableDrivers.count("cab")) {
driver = "cab"; // prefer CAB over TCP
} else if(availableDrivers.count("tcp")) {
driver = "tcp";
} else if(availableDrivers.count("vcp")) {
driver = "vcp";
} else {
// just take the first driver
driver = *availableDrivers.begin();
}
}
Servd::~Servd() {
@ -113,21 +103,31 @@ Servd::~Servd() {
}
bool Servd::open() {
const std::string request = SERVD_VERSION_STR + " open " + std::string(device.serial) + " " + driver;
const std::string request = SERVD_VERSION_STR + " open";
std::string response;
response.resize(512);
if(!messageSocket.transceive(SERVD_ADDRESS, request, response, std::chrono::milliseconds(5000))) {
if(!messageSocket.transceive(request, response, std::chrono::milliseconds(5000))) {
EventManager::GetInstance().add(APIEvent::Type::ServdTransceiveError, APIEvent::Severity::Error);
return false;
}
const auto tokens = split(response);
if(tokens.size() != 4) {
if(tokens.size() != 2) {
EventManager::GetInstance().add(APIEvent::Type::ServdInvalidResponseError, APIEvent::Severity::Error);
return false;
}
aliveThread = std::thread(&Servd::alive, this);
readThread = std::thread(&Servd::read, this, Address{tokens[2].c_str(), (uint16_t)std::stol(tokens[3].c_str())});
writeThread = std::thread(&Servd::write, this, Address{tokens[0].c_str(), (uint16_t)std::stol(tokens[1].c_str())});
dataSocket = std::make_unique<Socket>(AF_INET, SOCK_STREAM, 0);
const auto& ip = tokens[0];
uint16_t port = 0;
try {
port = static_cast<uint16_t>(std::stoi(tokens[1]));
} catch (const std::exception&) {
EventManager::GetInstance().add(APIEvent::Type::ServdInvalidResponseError, APIEvent::Severity::Error);
return false;
}
Address address(ip.c_str(), port);
dataSocket->connect(address);
readThread = std::thread(&Servd::read, this);
writeThread = std::thread(&Servd::write, this);
opened = true;
return true;
}
@ -138,9 +138,6 @@ bool Servd::isOpen() {
bool Servd::close() {
setIsClosing(true);
if(aliveThread.joinable()) {
aliveThread.join();
}
if(readThread.joinable()) {
readThread.join();
}
@ -148,8 +145,16 @@ bool Servd::close() {
writeThread.join();
}
if(isOpen()) {
const std::string request = SERVD_VERSION_STR + " close " + std::string(device.serial);
messageSocket.sendto(request.data(), request.size(), SERVD_ADDRESS);
Address localAddress;
dataSocket->address(localAddress);
const std::string request = SERVD_VERSION_STR + " close " + localAddress.ip() + " " + std::to_string(localAddress.port());
std::string response;
response.resize(1);
if(!messageSocket.transceive(request, response, std::chrono::milliseconds(5000))) {
EventManager::GetInstance().add(APIEvent::Type::ServdTransceiveError, APIEvent::Severity::Error);
return false;
}
dataSocket.reset();
}
opened = false;
setIsClosing(false);
@ -159,13 +164,13 @@ bool Servd::close() {
bool Servd::enableCommunication(bool enable, bool& sendMsg) {
const std::string serialString(device.serial);
{
const std::string request = SERVD_VERSION_STR + " lock " + serialString + " com 1000";
const std::string request = SERVD_VERSION_STR + " lock com 1000";
std::string response;
response.resize(1);
bool locked = false;
const auto timeout = std::chrono::steady_clock::now() + std::chrono::seconds(1);
do {
if(!messageSocket.transceive(SERVD_ADDRESS, request, response, std::chrono::milliseconds(5000))) {
if(!messageSocket.transceive(request, response, std::chrono::milliseconds(5000))) {
return false;
}
locked = response == "1" ? true : false;
@ -181,10 +186,10 @@ bool Servd::enableCommunication(bool enable, bool& sendMsg) {
}
uint64_t com = 0;
{
const std::string request = SERVD_VERSION_STR + " load " + serialString + " com";
const std::string request = SERVD_VERSION_STR + " load com";
std::string response;
response.resize(20);
if(!messageSocket.transceive(SERVD_ADDRESS, request, response, std::chrono::milliseconds(5000))) {
if(!messageSocket.transceive(request, response, std::chrono::milliseconds(5000))) {
EventManager::GetInstance().add(APIEvent::Type::ServdTransceiveError, APIEvent::Severity::Error);
return false;
}
@ -202,16 +207,20 @@ bool Servd::enableCommunication(bool enable, bool& sendMsg) {
}
if(comEnabled != enable) {
com += enable ? 1 : -1;
const std::string request = SERVD_VERSION_STR + " store " + serialString + " com " + std::to_string(com);
if(!messageSocket.sendto(request.data(), request.size(), SERVD_ADDRESS)) {
const std::string request = SERVD_VERSION_STR + " store com " + std::to_string(com);
std::string response;
response.resize(1);
if(!messageSocket.transceive(request, response, std::chrono::milliseconds(5000))) {
EventManager::GetInstance().add(APIEvent::Type::ServdSendError, APIEvent::Severity::Error);
return false;
}
}
comEnabled = enable;
{
const std::string request = SERVD_VERSION_STR + " unlock " + serialString + " com";
if(!messageSocket.sendto(request.data(), request.size(), SERVD_ADDRESS)) {
const std::string request = SERVD_VERSION_STR + " unlock com";
std::string response;
response.resize(1);
if(!messageSocket.transceive(request, response, std::chrono::milliseconds(5000))) {
EventManager::GetInstance().add(APIEvent::Type::ServdSendError, APIEvent::Severity::Error);
return false;
}
@ -219,78 +228,11 @@ bool Servd::enableCommunication(bool enable, bool& sendMsg) {
return true;
}
void Servd::alive() {
Socket socket;
socket.set_nonblocking();
socket.bind(Address("127.0.0.1", 0));
const std::string statusRequest = SERVD_VERSION_STR + " status " + std::string(device.serial);
std::string statusResponse;
statusResponse.resize(8);
while(!isDisconnected() && !isClosing()) {
if(!socket.sendto(statusRequest.data(), statusRequest.size(), {"127.0.0.1", 26741})) {
EventManager::GetInstance().add(APIEvent::Type::ServdSendError, APIEvent::Severity::Error);
setIsDisconnected(true);
return;
}
bool hasData;
if(!socket.poll(std::chrono::milliseconds(2000), hasData)) {
EventManager::GetInstance().add(APIEvent::Type::ServdPollError, APIEvent::Severity::Error);
setIsDisconnected(true);
return;
}
if(!hasData) {
EventManager::GetInstance().add(APIEvent::Type::ServdNoDataError, APIEvent::Severity::Error);
setIsDisconnected(true);
return;
}
size_t statusResponseSize = statusResponse.size();
if(!socket.recv(statusResponse.data(), statusResponseSize)) {
EventManager::GetInstance().add(APIEvent::Type::ServdRecvError, APIEvent::Severity::Error);
setIsDisconnected(true);
return;
}
statusResponse.resize(statusResponseSize);
if(statusRequest == "closed") {
EventManager::GetInstance().add(APIEvent::Type::DeviceDisconnected, APIEvent::Severity::Error);
setIsDisconnected(true);
return;
}
if(statusResponse != "open") {
EventManager::GetInstance().add(APIEvent::Type::ServdInvalidResponseError, APIEvent::Severity::Error);
setIsDisconnected(true);
return;
}
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
}
void Servd::read(Address&& address) {
Socket socket;
socket.set_nonblocking();
socket.set_reuse(true);
#ifdef _WIN32
if(!socket.bind(Address("127.0.0.1", address.port()))) {
EventManager::GetInstance().add(APIEvent::Type::ServdBindError, APIEvent::Severity::Error);
setIsDisconnected(true);
return;
}
#else
if(!socket.bind(Address(address.ip().c_str(), address.port()))) {
EventManager::GetInstance().add(APIEvent::Type::ServdBindError, APIEvent::Severity::Error);
setIsDisconnected(true);
return;
}
#endif
if(!socket.join_multicast("127.0.0.1", address.ip())) {
EventManager::GetInstance().add(APIEvent::Type::ServdJoinMulticastError, APIEvent::Severity::Error);
setIsDisconnected(true);
return;
}
std::vector<uint8_t> buf(65535);
void Servd::read() {
std::vector<uint8_t> buf(2 * 1024 * 1024);
while(!isDisconnected() && !isClosing()) {
bool hasData;
if(!socket.poll(std::chrono::milliseconds(100), hasData)) {
if(!dataSocket->poll(std::chrono::milliseconds(100), hasData)) {
EventManager::GetInstance().add(APIEvent::Type::ServdPollError, APIEvent::Severity::Error);
setIsDisconnected(true);
return;
@ -299,7 +241,7 @@ void Servd::read(Address&& address) {
continue;
}
size_t bufSize = buf.size();
if(!socket.recv(buf.data(), bufSize)) {
if(!dataSocket->recv(buf.data(), bufSize)) {
EventManager::GetInstance().add(APIEvent::Type::ServdRecvError, APIEvent::Severity::Error);
setIsDisconnected(true);
return;
@ -308,16 +250,14 @@ void Servd::read(Address&& address) {
}
}
void Servd::write(Address&& address) {
Socket socket;
socket.bind(Address("127.0.0.1", 0));
void Servd::write() {
WriteOperation writeOp;
while(!isDisconnected() && !isClosing()) {
if(!writeQueue.wait_dequeue_timed(writeOp, std::chrono::milliseconds(100))) {
continue;
}
if(!isClosing()) {
if(!socket.sendto(writeOp.bytes.data(), writeOp.bytes.size(), address)) {
if(!dataSocket->send(writeOp.bytes.data(), writeOp.bytes.size())) {
EventManager::GetInstance().add(APIEvent::Type::ServdSendError, APIEvent::Severity::Error);
setIsDisconnected(true);
return;