Platform: TCP: Add poll function to Socket

ks-refactor-docs
Kyle Schwarz 2024-07-30 02:09:24 +00:00
parent 14588591e5
commit 4476bd8b71
2 changed files with 24 additions and 33 deletions

View File

@ -38,6 +38,7 @@ private:
~Socket();
explicit operator bool() const { return fd != -1; }
operator SocketFileDescriptor() const { return fd; }
void poll(uint16_t event, uint32_t msTimeout);
private:
SocketFileDescriptor fd;
};

View File

@ -6,6 +6,7 @@
#else
#include <sys/socket.h>
#include <netinet/in.h>
#include <poll.h>
#include <unistd.h>
#include <ifaddrs.h>
#include <net/if.h>
@ -72,6 +73,20 @@ TCP::Socket::~Socket() {
#endif
}
void TCP::Socket::poll(uint16_t event, uint32_t msTimeout) {
#ifdef _WIN32
WSAPOLLFD pfd;
pfd.fd = fd;
pfd.events = event;
::WSAPoll(&pfd, 1, msTimeout);
#else
struct pollfd pfd;
pfd.fd = fd;
pfd.events = event;
::poll(&pfd, 1, msTimeout);
#endif
}
void TCP::Find(std::vector<FoundDevice>& found) {
static const auto MDNS_PORT = htons((unsigned short)5353);
static const auto MDNS_IP = htonl((((uint32_t)224U) << 24U) | ((uint32_t)251U));
@ -256,16 +271,13 @@ void TCP::Find(std::vector<FoundDevice>& found) {
continue;
}
timeval timeout = {};
timeout.tv_usec = 50000;
fd_set readfs;
FD_ZERO(&readfs);
int nfds = WIN_INT(socket) + 1;
FD_SET(socket, &readfs);
while(true) {
const auto rxTill = std::chrono::steady_clock::now() + std::chrono::milliseconds(100);
while(std::chrono::steady_clock::now() < rxTill) {
static constexpr size_t bufferLen = 2048;
uint8_t buffer[bufferLen];
::select(nfds, &readfs, 0, 0, &timeout); // timeout is intentially not reset, we want timeout.tv_usec _total_
// keep trying till the timeout
const auto msWait = std::chrono::duration_cast<std::chrono::milliseconds>(rxTill - std::chrono::steady_clock::now()).count();
socket.poll(POLLIN, static_cast<uint32_t>(msWait));
const auto recvRet = ::recv(socket, (char*)buffer, bufferLen, 0);
static constexpr auto headerLength = 12;
if(recvRet < headerLength) {
@ -460,13 +472,7 @@ bool TCP::open() {
}
#endif
timeval timeout = {};
timeout.tv_sec = 1;
fd_set writefs;
FD_ZERO(&writefs);
int nfds = WIN_INT(*partiallyOpenSocket) + 1;
FD_SET(*partiallyOpenSocket, &writefs);
::select(nfds, 0, &writefs, 0, &timeout);
partiallyOpenSocket->poll(POLLOUT, 1000);
if(::connect(*partiallyOpenSocket, (sockaddr*)&addr, sizeof(addr)) < 0) {
#ifdef _WIN32
@ -523,21 +529,13 @@ bool TCP::close() {
void TCP::readTask() {
EventManager::GetInstance().downgradeErrorsOnCurrentThread();
const int nfds = WIN_INT(*socket) + 1;
fd_set readfs;
FD_ZERO(&readfs);
FD_SET(*socket, &readfs);
timeval timeout;
constexpr size_t READ_BUFFER_SIZE = 2048;
uint8_t readbuf[READ_BUFFER_SIZE];
while(!closing) {
if(const auto received = ::recv(*socket, (char*)readbuf, READ_BUFFER_SIZE, 0); received > 0) {
pushRx(readbuf, received);
} else {
timeout.tv_sec = 0;
timeout.tv_usec = 50'000;
::select(nfds, &readfs, 0, 0, &timeout);
socket->poll(POLLIN, 100);
}
}
}
@ -545,12 +543,6 @@ void TCP::readTask() {
void TCP::writeTask() {
EventManager::GetInstance().downgradeErrorsOnCurrentThread();
const int nfds = WIN_INT(*socket) + 1;
fd_set writefs;
FD_ZERO(&writefs);
FD_SET(*socket, &writefs);
timeval timeout;
WriteOperation writeOp;
while(!closing) {
if(!writeQueue.wait_dequeue_timed(writeOp, std::chrono::milliseconds(100)))
@ -559,9 +551,7 @@ void TCP::writeTask() {
while(!closing) {
if(::send(*socket, (char*)writeOp.bytes.data(), WIN_INT(writeOp.bytes.size()), 0) > 0)
break;
timeout.tv_sec = 0;
timeout.tv_usec = 100'000;
::select(nfds, 0, &writefs, 0, &timeout);
socket->poll(POLLOUT, 100);
}
}
}