From 77928dc93dfe26394059210907b0dd876c0a5767 Mon Sep 17 00:00:00 2001 From: Yasser Yassine Date: Tue, 28 May 2024 18:21:22 +0000 Subject: [PATCH] Driver: Add general predicate parameter for waitForRx --- communication/driver.cpp | 13 ++++++++++--- include/icsneo/communication/driver.h | 5 +++-- platform/ftd3xx.cpp | 2 +- platform/posix/cdcacm.cpp | 2 +- platform/posix/firmio.cpp | 2 +- platform/posix/ftdi.cpp | 2 +- platform/posix/pcap.cpp | 2 +- platform/tcp.cpp | 2 +- platform/windows/pcap.cpp | 2 +- platform/windows/vcp.cpp | 4 ++-- 10 files changed, 22 insertions(+), 14 deletions(-) diff --git a/communication/driver.cpp b/communication/driver.cpp index 26673ac..243b3ef 100644 --- a/communication/driver.cpp +++ b/communication/driver.cpp @@ -8,7 +8,7 @@ using namespace icsneo; -bool Driver::writeToReadBuffer(const uint8_t* buf, size_t numReceived) { +bool Driver::pushRx(const uint8_t* buf, size_t numReceived) { bool ret = readBuffer.write(buf, numReceived); if(hasRxWaitRequest) { @@ -18,11 +18,18 @@ bool Driver::writeToReadBuffer(const uint8_t* buf, size_t numReceived) { return ret; } -bool Driver::waitForRx(size_t minBytes, std::chrono::milliseconds timeout) { +bool Driver::waitForRx(size_t limit, std::chrono::milliseconds timeout) { + return waitForRx([limit, this]() { + return readBuffer.size() >= limit; + }, timeout); +} + +bool Driver::waitForRx(std::function predicate, std::chrono::milliseconds timeout) { std::unique_lock lk(rxWaitMutex); hasRxWaitRequest = true; - auto ret = rxWaitRequestCv.wait_for(lk, timeout, [this, minBytes]{ return readBuffer.size() >= minBytes; }); + auto ret = rxWaitRequestCv.wait_for(lk, timeout, predicate); + hasRxWaitRequest = false; return ret; diff --git a/include/icsneo/communication/driver.h b/include/icsneo/communication/driver.h index 45e6230..d92e37e 100644 --- a/include/icsneo/communication/driver.h +++ b/include/icsneo/communication/driver.h @@ -27,7 +27,8 @@ public: virtual bool isDisconnected() { return disconnected; }; virtual bool close() = 0; - bool waitForRx(size_t minBytes, std::chrono::milliseconds timeout); + bool waitForRx(size_t limit, std::chrono::milliseconds timeout); + bool waitForRx(std::function predicate, std::chrono::milliseconds timeout); bool readWait(std::vector& bytes, std::chrono::milliseconds timeout = std::chrono::milliseconds(100), size_t limit = 0); bool write(const std::vector& bytes); virtual bool isEthernet() const { return false; } @@ -59,7 +60,7 @@ protected: virtual bool writeQueueAlmostFull() { return writeQueue.size_approx() > (writeQueueSize * 3 / 4); } virtual bool writeInternal(const std::vector& b) { return writeQueue.enqueue(WriteOperation(b)); } - bool writeToReadBuffer(const uint8_t* buf, size_t numReceived); + bool pushRx(const uint8_t* buf, size_t numReceived); RingBuffer readBuffer = RingBuffer(ICSNEO_DRIVER_RINGBUFFER_SIZE); std::atomic hasRxWaitRequest = false; std::condition_variable rxWaitRequestCv; diff --git a/platform/ftd3xx.cpp b/platform/ftd3xx.cpp index fd0ce7a..0446eb7 100644 --- a/platform/ftd3xx.cpp +++ b/platform/ftd3xx.cpp @@ -136,7 +136,7 @@ void FTD3XX::readTask() { } FT_ReleaseOverlapped(*handle, &overlap); if(received > 0) { - writeToReadBuffer(buffer, received); + pushRx(buffer, received); } } } diff --git a/platform/posix/cdcacm.cpp b/platform/posix/cdcacm.cpp index df496e5..3239b2a 100644 --- a/platform/posix/cdcacm.cpp +++ b/platform/posix/cdcacm.cpp @@ -190,7 +190,7 @@ void CDCACM::readTask() { } std::cout << std::dec << std::endl; #endif - writeToReadBuffer(readbuf, bytesRead); + pushRx(readbuf, bytesRead); } else { if(modeChanging) { // We were expecting a disconnect for reenumeration diff --git a/platform/posix/firmio.cpp b/platform/posix/firmio.cpp index d6215f4..690cad4 100644 --- a/platform/posix/firmio.cpp +++ b/platform/posix/firmio.cpp @@ -242,7 +242,7 @@ void FirmIO::readTask() { // Translate the physical address back to our virtual address space uint8_t* addr = reinterpret_cast(msg.payload.data.addr - PHY_ADDR_BASE + vbase); - while (!writeToReadBuffer(addr, msg.payload.data.len)) { + while (!pushRx(addr, msg.payload.data.len)) { std::this_thread::sleep_for(std::chrono::milliseconds(1)); // back-off so reading thread can empty the buffer if (closing || isDisconnected()) { break; diff --git a/platform/posix/ftdi.cpp b/platform/posix/ftdi.cpp index 24a4041..2fd0ca6 100644 --- a/platform/posix/ftdi.cpp +++ b/platform/posix/ftdi.cpp @@ -213,7 +213,7 @@ void FTDI::readTask() { } else report(APIEvent::Type::FailedToRead, APIEvent::Severity::EventWarning); } else - writeToReadBuffer(readbuf, readBytes); + pushRx(readbuf, readBytes); } } diff --git a/platform/posix/pcap.cpp b/platform/posix/pcap.cpp index 4135bcf..ed045f0 100644 --- a/platform/posix/pcap.cpp +++ b/platform/posix/pcap.cpp @@ -284,7 +284,7 @@ void PCAP::readTask() { PCAP* driver = reinterpret_cast(obj); if(driver->ethPacketizer.inputUp({data, data + header->caplen})) { const auto bytes = driver->ethPacketizer.outputUp(); - driver->writeToReadBuffer(bytes.data(), bytes.size()); + driver->pushRx(bytes.data(), bytes.size()); } }, (uint8_t*)this); } diff --git a/platform/tcp.cpp b/platform/tcp.cpp index 966e7f1..91dca55 100644 --- a/platform/tcp.cpp +++ b/platform/tcp.cpp @@ -533,7 +533,7 @@ void TCP::readTask() { uint8_t readbuf[READ_BUFFER_SIZE]; while(!closing) { if(const auto received = ::recv(*socket, (char*)readbuf, READ_BUFFER_SIZE, 0); received > 0) { - writeToReadBuffer(readbuf, received); + pushRx(readbuf, received); } else { timeout.tv_sec = 0; timeout.tv_usec = 50'000; diff --git a/platform/windows/pcap.cpp b/platform/windows/pcap.cpp index e76ff20..3b9e716 100644 --- a/platform/windows/pcap.cpp +++ b/platform/windows/pcap.cpp @@ -278,7 +278,7 @@ void PCAP::readTask() { if(ethPacketizer.inputUp({data, data + header->caplen})) { const auto bytes = ethPacketizer.outputUp(); - writeToReadBuffer(bytes.data(), bytes.size()); + pushRx(bytes.data(), bytes.size()); } } } diff --git a/platform/windows/vcp.cpp b/platform/windows/vcp.cpp index 8d185dd..8f41eca 100644 --- a/platform/windows/vcp.cpp +++ b/platform/windows/vcp.cpp @@ -390,7 +390,7 @@ void VCP::readTask() { if(ReadFile(detail->handle, readbuf, READ_BUFFER_SIZE, nullptr, &detail->overlappedRead)) { if(GetOverlappedResult(detail->handle, &detail->overlappedRead, &bytesRead, FALSE)) { if(bytesRead) - writeToReadBuffer(readbuf, bytesRead); + pushRx(readbuf, bytesRead); } continue; } @@ -413,7 +413,7 @@ void VCP::readTask() { auto ret = WaitForSingleObject(detail->overlappedRead.hEvent, 100); if(ret == WAIT_OBJECT_0) { if(GetOverlappedResult(detail->handle, &detail->overlappedRead, &bytesRead, FALSE)) { - writeToReadBuffer(readbuf, bytesRead); + pushRx(readbuf, bytesRead); state = LAUNCH; } else report(APIEvent::Type::FailedToRead, APIEvent::Severity::Error);