From 63f0516318712df79bd6056558d309df0a3fd55c Mon Sep 17 00:00:00 2001 From: Jonathan Schwartz Date: Fri, 5 Apr 2024 17:24:53 +0000 Subject: [PATCH] Replace concurrentqueue with ringbuffer --- CMakeLists.txt | 2 + communication/communication.cpp | 4 +- communication/driver.cpp | 33 +++----- communication/packetizer.cpp | 8 +- communication/ringbuffer.cpp | 91 +++++++++++++++++++++ include/icsneo/communication/driver.h | 8 +- include/icsneo/communication/packetizer.h | 95 +--------------------- include/icsneo/communication/ringbuffer.h | 75 ++++++++++++++++++ platform/ftd3xx.cpp | 5 +- platform/posix/cdcacm.cpp | 6 +- platform/posix/firmio.cpp | 28 ++++--- platform/posix/ftdi.cpp | 5 +- platform/posix/pcap.cpp | 10 ++- platform/tcp.cpp | 5 +- platform/windows/pcap.cpp | 13 +-- platform/windows/vcp.cpp | 7 +- test/a2bencoderdecodertest.cpp | 6 +- test/i2cencoderdecodertest.cpp | 7 +- test/linencoderdecodertest.cpp | 6 +- test/livedataencoderdecodertest.cpp | 10 ++- test/mdioencoderdecodertest.cpp | 10 ++- test/ringbuffertest.cpp | 96 +++++++++++++++++++++++ 22 files changed, 361 insertions(+), 169 deletions(-) create mode 100644 communication/ringbuffer.cpp create mode 100644 include/icsneo/communication/ringbuffer.h create mode 100644 test/ringbuffertest.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 29397a8..dd2d027 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -266,6 +266,7 @@ set(SRC_FILES communication/communication.cpp communication/driver.cpp communication/livedata.cpp + communication/ringbuffer.cpp device/extensions/flexray/extension.cpp device/extensions/flexray/controller.cpp device/idevicesettings.cpp @@ -514,6 +515,7 @@ if(LIBICSNEO_BUILD_TESTS) test/a2bencoderdecodertest.cpp test/mdioencoderdecodertest.cpp test/livedataencoderdecodertest.cpp + test/ringbuffertest.cpp ) target_link_libraries(libicsneo-tests gtest gtest_main) diff --git a/communication/communication.cpp b/communication/communication.cpp index 9b18d6a..678a682 100644 --- a/communication/communication.cpp +++ b/communication/communication.cpp @@ -268,7 +268,7 @@ void Communication::readTask() { while(!closing) { readBytes.clear(); - if(driver->readWait(readBytes)) { + if(driver->readAvailable()) { handleInput(*packetizer, readBytes); } } @@ -291,7 +291,7 @@ void Communication::handleInput(Packetizer& p, std::vector& readBytes) handleInput(p, readBytes); // and we might as well process this input ourselves } } else { - if(p.input(readBytes)) { + if(p.input(driver->getReadBuffer())) { for(const auto& packet : p.output()) { std::shared_ptr msg; if(!decoder->decode(msg, packet)) diff --git a/communication/driver.cpp b/communication/driver.cpp index 999fa19..c2fb0f1 100644 --- a/communication/driver.cpp +++ b/communication/driver.cpp @@ -8,37 +8,24 @@ using namespace icsneo; -bool Driver::read(std::vector& bytes, size_t limit) { - // A limit of zero indicates no limit - if(limit == 0) - limit = (size_t)-1; - - if(limit > (readQueue.size_approx() + 4)) - limit = (readQueue.size_approx() + 4); - - if(bytes.capacity() < limit) - bytes.resize(limit); - - size_t actuallyRead = readQueue.try_dequeue_bulk(bytes.data(), limit); - - if(bytes.size() > actuallyRead) - bytes.resize(actuallyRead); - - return true; -} - bool Driver::readWait(std::vector& bytes, std::chrono::milliseconds timeout, size_t limit) { // A limit of zero indicates no limit if(limit == 0) limit = (size_t)-1; - if(limit > (readQueue.size_approx() + 4)) - limit = (readQueue.size_approx() + 4); + if(limit > (readBuffer.size() + 4)) + limit = (readBuffer.size() + 4); bytes.resize(limit); - size_t actuallyRead = readQueue.wait_dequeue_bulk_timed(bytes.data(), limit, timeout); - + // wait until we have enough data, or the timout occurs + const auto timeoutTime = std::chrono::steady_clock::now() + timeout; + while (readBuffer.size() < limit && std::chrono::steady_clock::now() < timeoutTime) { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + size_t actuallyRead = std::min(readBuffer.size(), limit); + readBuffer.read(bytes.data(), 0, actuallyRead); + readBuffer.pop(actuallyRead); bytes.resize(actuallyRead); #ifdef ICSNEO_DRIVER_DEBUG_PRINTS diff --git a/communication/packetizer.cpp b/communication/packetizer.cpp index a09108f..3e43fef 100644 --- a/communication/packetizer.cpp +++ b/communication/packetizer.cpp @@ -24,11 +24,9 @@ std::vector& Packetizer::packetWrap(std::vector& data, bool sh return data; } -bool Packetizer::input(const std::vector& inputBytes) { +bool Packetizer::input(RingBuffer& bytes) { bool haveEnoughData = true; - bytes.Copy(inputBytes); - while(haveEnoughData) { switch(state) { case ReadState::SearchForHeader: @@ -152,14 +150,14 @@ bool Packetizer::input(const std::vector& inputBytes) { if(packetLength > 0) packet.data.resize(packetLength - headerSize); - bytes.CopyTo(packet.data.data(), currentIndex, (packetLength - currentIndex)); + bytes.read(packet.data.data(), currentIndex, (packetLength - currentIndex)); currentIndex = packetLength; if(disableChecksum || !checksum || bytes[currentIndex] == ICSChecksum(packet.data)) { // Got a good packet gotGoodPackets = true; processedPackets.push_back(std::make_shared(packet)); - bytes.Erase_front(packetLength); + bytes.pop(packetLength); if(packet.network == Network::NetID::DiskData && (packetLength - headerSize) % 2 == 0) { bytes.pop_front(); diff --git a/communication/ringbuffer.cpp b/communication/ringbuffer.cpp new file mode 100644 index 0000000..be73f7d --- /dev/null +++ b/communication/ringbuffer.cpp @@ -0,0 +1,91 @@ +#include "icsneo/communication/ringbuffer.h" +#include + +namespace icsneo { + +RingBuffer::RingBuffer(size_t bufferSize) : readCursor(0), writeCursor(0) { + // round the buffer size to the nearest power of 2 + bufferSize = RoundUp(bufferSize); + mask = bufferSize - 1; + buf = new uint8_t[bufferSize]; +} + +RingBuffer::~RingBuffer() { + delete[] buf; + buf = nullptr; +} + +const uint8_t& RingBuffer::operator[](size_t offset) const { + return get(offset); +} + +size_t RingBuffer::size() const { + // The values in the cursors are monotonic, i.e. they only ever increment. They can be considered to be the total number of elements ever written or read + auto currentWriteCursor = writeCursor.load(std::memory_order_relaxed); + auto currentReadCursor = readCursor.load(std::memory_order_relaxed); + // Using unmasked values, writeCursor is guaranteed to be >= readCursor. If they are equal that means the buffer is empty + return currentWriteCursor - currentReadCursor; +} + +void RingBuffer::pop_front() { + pop(1); +} + +void RingBuffer::pop(size_t count) { + if (size() < count) { + throw std::runtime_error("RingBuffer: Underflow"); + } + readCursor.fetch_add(count, std::memory_order_release); +} + +const uint8_t& RingBuffer::get(size_t offset) const { + if (offset >= size()) { + throw std::runtime_error("RingBuffer: Index out of range"); + } + auto currentReadCursor = readCursor.load(std::memory_order_acquire); + return *resolve(currentReadCursor, offset); +} + +bool RingBuffer::write(const uint8_t* addr, size_t length) { + const auto freeSpace = (capacity() - size()); + if (length > freeSpace) { + return false; + } + auto currentWriteCursor = writeCursor.load(std::memory_order_relaxed); + auto spaceAtEnd = std::min(freeSpace, capacity() - (currentWriteCursor & mask)); // number of bytes from (masked) writeCursor to the end of the writable space (i.e. we reach the masked read cursor or the end of the buffer) + auto firstCopySize = std::min(spaceAtEnd, length); + (void)memcpy(resolve(currentWriteCursor, 0), addr, firstCopySize); + if (firstCopySize < length) + { + (void)memcpy(buf, &addr[firstCopySize], length - firstCopySize); + } + + writeCursor.store(currentWriteCursor + length, std::memory_order_release); + return true; +} + +bool RingBuffer::write(const std::vector& source) { + return write(source.data(), source.size()); +} + +bool RingBuffer::read(uint8_t* dest, size_t startIndex, size_t length) const { + auto currentSize = size(); + if ((startIndex >= currentSize) || ((startIndex + length) > size())) { + return false; + } + auto currentReadCursor = readCursor.load(std::memory_order_relaxed); + auto bytesAtEnd = std::min(capacity() - ((currentReadCursor + startIndex) & mask), length); + const auto bytesAtStart = (length - bytesAtEnd); + + (void)memcpy(dest, resolve(currentReadCursor, startIndex), bytesAtEnd); + if (bytesAtStart > 0) { + (void)memcpy(&dest[bytesAtEnd], buf, bytesAtStart); + } + return true; +} + +void RingBuffer::clear() { + pop(size()); +} + +} \ No newline at end of file diff --git a/include/icsneo/communication/driver.h b/include/icsneo/communication/driver.h index ccdf6b9..0de0a56 100644 --- a/include/icsneo/communication/driver.h +++ b/include/icsneo/communication/driver.h @@ -11,9 +11,11 @@ #include #include "icsneo/api/eventmanager.h" #include "icsneo/third-party/concurrentqueue/blockingconcurrentqueue.h" +#include "icsneo/communication/ringbuffer.h" namespace icsneo { +#define ICSNEO_DRIVER_RINGBUFFER_SIZE (512 * 1024) class Driver { public: Driver(const device_eventhandler_t& handler) : report(handler) {} @@ -24,10 +26,11 @@ public: virtual void awaitModeChangeComplete() {} virtual bool isDisconnected() { return disconnected; }; virtual bool close() = 0; - bool read(std::vector& bytes, size_t limit = 0); bool readWait(std::vector& bytes, std::chrono::milliseconds timeout = std::chrono::milliseconds(100), size_t limit = 0); bool write(const std::vector& bytes); virtual bool isEthernet() const { return false; } + bool readAvailable() { return readBuffer.size() > 0; } + RingBuffer& getReadBuffer() { return readBuffer; } device_eventhandler_t report; @@ -54,7 +57,8 @@ protected: virtual bool writeQueueAlmostFull() { return writeQueue.size_approx() > (writeQueueSize * 3 / 4); } virtual bool writeInternal(const std::vector& b) { return writeQueue.enqueue(WriteOperation(b)); } - moodycamel::BlockingConcurrentQueue readQueue; + + RingBuffer readBuffer = RingBuffer(ICSNEO_DRIVER_RINGBUFFER_SIZE); moodycamel::BlockingConcurrentQueue writeQueue; std::thread readThread, writeThread; std::atomic closing{false}; diff --git a/include/icsneo/communication/packetizer.h b/include/icsneo/communication/packetizer.h index cfee43b..abc8778 100644 --- a/include/icsneo/communication/packetizer.h +++ b/include/icsneo/communication/packetizer.h @@ -4,14 +4,13 @@ #ifdef __cplusplus #include "icsneo/communication/packet.h" +#include "icsneo/communication/ringbuffer.h" #include "icsneo/api/eventmanager.h" #include #include #include #include -#define ICSNEO_PACKETIZER_BUFFER_SIZE (512 * 1024) - namespace icsneo { class Packetizer { @@ -22,7 +21,7 @@ public: std::vector& packetWrap(std::vector& data, bool shortFormat) const; - bool input(const std::vector& bytes); + bool input(RingBuffer& bytes); std::vector> output(); bool disableChecksum = false; // Even for short packets @@ -37,95 +36,6 @@ private: GetData }; - class RingBuffer - { - private: - constexpr static size_t mBufferSize = ICSNEO_PACKETIZER_BUFFER_SIZE; - - size_t mStartOffset; - size_t mSize; - uint8_t mData[mBufferSize]; - - public: - RingBuffer(void) - : mStartOffset(0) - , mSize(0) - { - (void)memset(mData, 0, mBufferSize); - } - - const uint8_t& operator [](size_t offset) { return Get(offset); } - size_t size(void) { return mSize; } - void pop_front(void) - { - Erase_front(1); - } - - void Erase_front(size_t count) - { - if (mSize < count) - { - throw std::runtime_error("RingBuffer: Underflow"); - } - mStartOffset = (mStartOffset + count) % mBufferSize; - mSize -= count; - } - - const uint8_t& Get(size_t offset) - { - if (offset >= mSize) - { - throw std::runtime_error("RingBuffer: Index out of range"); - } - return *Resolve(offset); - } - - void Copy(const std::vector& source) - { - const auto inputSize = source.size(); - const auto octetsAvailable = (mBufferSize - mSize); - - if (inputSize > octetsAvailable) - { - throw std::runtime_error("RingBuffer: Out of memory"); - } - - const auto octetsAvailableTail = (octetsAvailable - mStartOffset); - const auto octetsToWrap = (inputSize > octetsAvailableTail) ? (inputSize - octetsAvailableTail) : 0; - const auto octetsToAppend = (inputSize - octetsToWrap); - - (void)memcpy(Resolve(mSize), source.data(), octetsToAppend); - if (octetsToWrap > 0) - { - (void)memcpy(mData, &source.data()[octetsToAppend], octetsToWrap); - } - mSize += inputSize; - } - - void CopyTo(uint8_t* dest, size_t startIndex, size_t length) - { - if ((startIndex + length) > mSize) - { - throw std::runtime_error("RingBuffer: Index out of range"); - } - - const auto octetsToReadHead = std::min((mBufferSize - mStartOffset - startIndex), length); - const auto octetsToReadTail = (length - octetsToReadHead); - - (void)memcpy(dest, Resolve(startIndex), octetsToReadHead); - if (octetsToReadTail > 0) - { - (void)memcpy(&dest[octetsToReadHead], mData, octetsToReadTail); - } - } - - protected: - inline uint8_t* Resolve(size_t offset) - { - return &mData[(mStartOffset + offset) % mBufferSize]; - } - }; - ReadState state = ReadState::SearchForHeader; int currentIndex = 0; @@ -134,7 +44,6 @@ private: bool checksum = false; bool gotGoodPackets = false; // Tracks whether we've ever gotten a good packet Packet packet; - RingBuffer bytes; std::vector> processedPackets; diff --git a/include/icsneo/communication/ringbuffer.h b/include/icsneo/communication/ringbuffer.h new file mode 100644 index 0000000..3b9c5c8 --- /dev/null +++ b/include/icsneo/communication/ringbuffer.h @@ -0,0 +1,75 @@ +#ifndef _RINGBUFFER_H_ +#define _RINGBUFFER_H_ + +#include +#include +#include +#include +#include +#include +#include +#if __cplusplus >= 202002L +#include +#endif +namespace icsneo { + +class RingBuffer +{ +private: + static constexpr size_t RoundUp(size_t size) { + if (size == 0) { + // Avoid underflow when decrementing later + return 1; + } else if (size >= SIZE_MAX) { + // overflow case - resolve to max size + return MaxSize; + } +#if __cplusplus >= 202002L + // c++20 gives us countl_zero which should be more effecient on most platforms + auto lzero = std::countl_zero(size - 1); + auto shift = (sizeof(size_t) * 8) - lzero; + return 1ull << shift; +#else + // Bit twiddling magic! See http://graphics.stanford.edu/~seander/bithacks.html#RoundUpPowerOf2 + --size; + size |= size >> 1; + size |= size >> 2; + size |= size >> 4; + for (size_t i = 1; i < sizeof(size_t); i <<= 1) { + size |= size >> (i << 3); + } + ++size; + return size; +#endif + } + //static_assert(std::atomic::is_always_lock_free, "RingBuffer cursor types are not lock-free"); + std::atomic readCursor; + std::atomic writeCursor; + // Use this to mask the cursor values to the buffer size. This is set to capacity - 1 where capacity is always an integral power of 2 (2, 4, 8, 16, etc) + size_t mask; + uint8_t* buf; + +public: + static constexpr auto MaxSize = 1ull << ((8 * sizeof(size_t)) - 1); + RingBuffer(size_t bufferSize); + ~RingBuffer(); + const uint8_t& operator[](size_t offset) const; + size_t size() const; + void pop_front(); + void pop(size_t count); + const uint8_t& get(size_t offset) const; + bool write(const uint8_t* addr, size_t count); + bool write(const std::vector& source); + bool read(uint8_t* dest, size_t startIndex, size_t length) const; + void clear(); + constexpr size_t capacity() const { + return mask + 1; + } + +protected: + inline uint8_t* resolve(size_t cursor, size_t offset) const { + return &buf[(cursor + offset) & mask]; + } +}; +} +#endif \ No newline at end of file diff --git a/platform/ftd3xx.cpp b/platform/ftd3xx.cpp index e4dcb72..a52cea8 100644 --- a/platform/ftd3xx.cpp +++ b/platform/ftd3xx.cpp @@ -89,9 +89,8 @@ bool FTD3XX::close() { if(writeThread.joinable()) writeThread.join(); - uint8_t flush; WriteOperation flushop; - while(readQueue.try_dequeue(flush)) {} + readBuffer.pop(readBuffer.size()); while(writeQueue.try_dequeue(flushop)) {} if(const auto ret = FT_Close(*handle); ret != FT_OK) { @@ -137,7 +136,7 @@ void FTD3XX::readTask() { } FT_ReleaseOverlapped(*handle, &overlap); if(received > 0) { - readQueue.enqueue_bulk(buffer, received); + readBuffer.write(buffer, received); } } } diff --git a/platform/posix/cdcacm.cpp b/platform/posix/cdcacm.cpp index 0cf7bed..d517b2d 100644 --- a/platform/posix/cdcacm.cpp +++ b/platform/posix/cdcacm.cpp @@ -140,9 +140,8 @@ bool CDCACM::close() { int ret = ::close(fd); fd = -1; - uint8_t flush; WriteOperation flushop; - while (readQueue.try_dequeue(flush)) {} + readBuffer.clear(); while (writeQueue.try_dequeue(flushop)) {} if(modeChanging) { @@ -191,8 +190,7 @@ void CDCACM::readTask() { } std::cout << std::dec << std::endl; #endif - - readQueue.enqueue_bulk(readbuf, bytesRead); + readBuffer.write(readbuf, bytesRead); } else { if(modeChanging) { // We were expecting a disconnect for reenumeration diff --git a/platform/posix/firmio.cpp b/platform/posix/firmio.cpp index e432fa6..9a1a4a9 100644 --- a/platform/posix/firmio.cpp +++ b/platform/posix/firmio.cpp @@ -16,6 +16,7 @@ #include #include #include +#include using namespace icsneo; @@ -44,13 +45,13 @@ void FirmIO::Find(std::vector& found) { const auto start = steady_clock::now(); // Get an absolute wall clock to compare to const auto overallTimeout = start + milliseconds(500); - while(temp.readWait(payload, milliseconds(50))) { + while(!temp.readAvailable()) { if(steady_clock::now() > overallTimeout) { // failed to read out a serial number reponse in time break; } - if(!packetizer.input(payload)) + if(!packetizer.input(temp.getReadBuffer())) continue; // A full packet has not yet been read out for(const auto& packet : packetizer.output()) { @@ -182,9 +183,6 @@ bool FirmIO::close() { ret |= ::close(fd); fd = -1; - uint8_t flush; - while (readQueue.try_dequeue(flush)) {} - if(ret == 0) { return true; } else { @@ -198,6 +196,12 @@ void FirmIO::readTask() { Msg msg; std::vector toFree; + // attempt to elevate the thread priority. PRIO_MIN is actually the highest priority but the lowest value. + int err = setpriority(PRIO_PROCESS, 0, -1); + if (err != 0) { + std::cerr << "FirmIO::readTask setpriority failed : " << strerror(errno) << std::endl; + } + while(!closing && !isDisconnected()) { fd_set rfds = {0}; struct timeval tv = {0}; @@ -219,10 +223,7 @@ void FirmIO::readTask() { toFree.clear(); int i = 0; - while(!in->isEmpty() && i++ < 1000) { - if(!in->read(&msg)) - break; - + while(in->read(&msg) && i++ < 1000) { switch(msg.command) { case Msg::Command::ComData: { if(toFree.empty() || toFree.back().payload.free.refCount == 6) { @@ -241,9 +242,14 @@ void FirmIO::readTask() { // Translate the physical address back to our virtual address space uint8_t* addr = reinterpret_cast(msg.payload.data.addr - PHY_ADDR_BASE + vbase); - readQueue.enqueue_bulk(addr, msg.payload.data.len); - break; + while (!readBuffer.write(addr, msg.payload.data.len)) { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); // back-off so reading thread can empty the buffer + if (closing || isDisconnected()) { + break; + } + } } + break; case Msg::Command::ComFree: { std::lock_guard lk(outMutex); // std::cout << "Got some free " << std::hex << msg.payload.free.ref[0] << std::endl; diff --git a/platform/posix/ftdi.cpp b/platform/posix/ftdi.cpp index a96328a..4a00147 100644 --- a/platform/posix/ftdi.cpp +++ b/platform/posix/ftdi.cpp @@ -109,9 +109,8 @@ bool FTDI::close() { report(APIEvent::Type::DriverFailedToClose, APIEvent::Severity::Error); } - uint8_t flush; WriteOperation flushop; - while(readQueue.try_dequeue(flush)) {} + readBuffer.clear(); while(writeQueue.try_dequeue(flushop)) {} closing = false; @@ -214,7 +213,7 @@ void FTDI::readTask() { } else report(APIEvent::Type::FailedToRead, APIEvent::Severity::EventWarning); } else - readQueue.enqueue_bulk(readbuf, readBytes); + readBuffer.write(readbuf, readBytes); } } diff --git a/platform/posix/pcap.cpp b/platform/posix/pcap.cpp index 451fe77..e7fc27e 100644 --- a/platform/posix/pcap.cpp +++ b/platform/posix/pcap.cpp @@ -136,6 +136,8 @@ void PCAP::Find(std::vector& found) { pcap_sendpacket(iface.fp, bs.data(), (int)bs.size()); auto timeout = std::chrono::high_resolution_clock::now() + std::chrono::milliseconds(50); + constexpr const size_t TempBufferSize = 4096; + RingBuffer tempBuffer(TempBufferSize); while(std::chrono::high_resolution_clock::now() <= timeout) { // Wait up to 50ms for the response struct pcap_pkthdr* header; const uint8_t* data; @@ -158,7 +160,8 @@ void PCAP::Find(std::vector& found) { continue; // This packet is not for us Packetizer packetizer([](APIEvent::Type, APIEvent::Severity) {}); - if(!packetizer.input(ethPacketizer.outputUp())) + tempBuffer.write(ethPacketizer.outputUp()); + if(!packetizer.input(tempBuffer)) continue; // This packet was not well formed EthernetPacketizer::EthernetPacket decoded(data, header->caplen); @@ -267,9 +270,8 @@ bool PCAP::close() { pcap_close(iface.fp); iface.fp = nullptr; - uint8_t flush; WriteOperation flushop; - while(readQueue.try_dequeue(flush)) {} + readBuffer.clear(); while(writeQueue.try_dequeue(flushop)) {} return true; @@ -282,7 +284,7 @@ void PCAP::readTask() { PCAP* driver = reinterpret_cast(obj); if(driver->ethPacketizer.inputUp({data, data + header->caplen})) { const auto bytes = driver->ethPacketizer.outputUp(); - driver->readQueue.enqueue_bulk(bytes.data(), bytes.size()); + driver->readBuffer.write(bytes.data(), bytes.size()); } }, (uint8_t*)this); } diff --git a/platform/tcp.cpp b/platform/tcp.cpp index ecdb0b7..97b443d 100644 --- a/platform/tcp.cpp +++ b/platform/tcp.cpp @@ -510,9 +510,8 @@ bool TCP::close() { if(writeThread.joinable()) writeThread.join(); - uint8_t flush; WriteOperation flushop; - while(readQueue.try_dequeue(flush)) {} + readBuffer.pop(readBuffer.size()); while(writeQueue.try_dequeue(flushop)) {} socket.reset(); @@ -534,7 +533,7 @@ void TCP::readTask() { uint8_t readbuf[READ_BUFFER_SIZE]; while(!closing) { if(const auto received = ::recv(*socket, (char*)readbuf, READ_BUFFER_SIZE, 0); received > 0) { - readQueue.enqueue_bulk(readbuf, received); + readBuffer.write(readbuf, received); } else { timeout.tv_sec = 0; timeout.tv_usec = 50'000; diff --git a/platform/windows/pcap.cpp b/platform/windows/pcap.cpp index c22592c..f14f5f9 100644 --- a/platform/windows/pcap.cpp +++ b/platform/windows/pcap.cpp @@ -7,6 +7,7 @@ #include "icsneo/communication/ethernetpacketizer.h" #include "icsneo/communication/packetizer.h" #include "icsneo/communication/decoder.h" +#include "icsneo/communication/ringbuffer.h" #include #include #pragma comment(lib, "IPHLPAPI.lib") @@ -123,7 +124,9 @@ void PCAP::Find(std::vector& found) { auto bs = requestPacket.getBytestream(); pcap.sendpacket(iface.fp, bs.data(), (int)bs.size()); - auto timeout = std::chrono::high_resolution_clock::now() + std::chrono::milliseconds(5); + auto timeout = std::chrono::high_resolution_clock::now() + std::chrono::milliseconds(250); + constexpr const size_t TempBufferSize = 4096; + RingBuffer tempBuffer(TempBufferSize); while(std::chrono::high_resolution_clock::now() <= timeout) { // Wait up to 5ms for the response struct pcap_pkthdr* header; const uint8_t* data; @@ -142,7 +145,8 @@ void PCAP::Find(std::vector& found) { continue; // This packet is not for us Packetizer packetizer([](APIEvent::Type, APIEvent::Severity) {}); - if(!packetizer.input(ethPacketizer.outputUp())) + tempBuffer.write(ethPacketizer.outputUp()); + if(!packetizer.input(tempBuffer)) continue; // This packet was not well formed EthernetPacketizer::EthernetPacket decoded(data, header->caplen); @@ -251,9 +255,8 @@ bool PCAP::close() { pcap.close(iface.fp); iface.fp = nullptr; - uint8_t flush; WriteOperation flushop; - while(readQueue.try_dequeue(flush)) {} + readBuffer.clear(); while(writeQueue.try_dequeue(flushop)) {} transmitQueue = nullptr; @@ -275,7 +278,7 @@ void PCAP::readTask() { if(ethPacketizer.inputUp({data, data + header->caplen})) { const auto bytes = ethPacketizer.outputUp(); - readQueue.enqueue_bulk(bytes.data(), bytes.size()); + readBuffer.write(bytes.data(), bytes.size()); } } } diff --git a/platform/windows/vcp.cpp b/platform/windows/vcp.cpp index cc4650d..d7d81a8 100644 --- a/platform/windows/vcp.cpp +++ b/platform/windows/vcp.cpp @@ -357,9 +357,8 @@ bool VCP::close() { detail->overlappedWait.hEvent = INVALID_HANDLE_VALUE; } - uint8_t flush; WriteOperation flushop; - while(readQueue.try_dequeue(flush)) {} + readBuffer.clear(); while(writeQueue.try_dequeue(flushop)) {} if(!ret) @@ -391,7 +390,7 @@ void VCP::readTask() { if(ReadFile(detail->handle, readbuf, READ_BUFFER_SIZE, nullptr, &detail->overlappedRead)) { if(GetOverlappedResult(detail->handle, &detail->overlappedRead, &bytesRead, FALSE)) { if(bytesRead) - readQueue.enqueue_bulk(readbuf, bytesRead); + readBuffer.write(readbuf, bytesRead); } continue; } @@ -414,7 +413,7 @@ void VCP::readTask() { auto ret = WaitForSingleObject(detail->overlappedRead.hEvent, 100); if(ret == WAIT_OBJECT_0) { if(GetOverlappedResult(detail->handle, &detail->overlappedRead, &bytesRead, FALSE)) { - readQueue.enqueue_bulk(readbuf, bytesRead); + readBuffer.write(readbuf, bytesRead); state = LAUNCH; } else report(APIEvent::Type::FailedToRead, APIEvent::Severity::Error); diff --git a/test/a2bencoderdecodertest.cpp b/test/a2bencoderdecodertest.cpp index 946c7c1..ea7f8ff 100644 --- a/test/a2bencoderdecodertest.cpp +++ b/test/a2bencoderdecodertest.cpp @@ -3,6 +3,7 @@ #include "icsneo/communication/packet/a2bpacket.h" #include "icsneo/communication/message/a2bmessage.h" #include "icsneo/communication/packetizer.h" +#include "icsneo/communication/ringbuffer.h" #include "icsneo/api/eventmanager.h" #include "gtest/gtest.h" #include @@ -26,6 +27,7 @@ protected: std::optional packetEncoder; std::optional packetizer; std::optional packetDecoder; + RingBuffer ringBuffer = RingBuffer(128); std::vector testBytes = {0xaa, 0x0c, 0x15, 0x00, 0x0b, 0x02, 0x00, 0x00, @@ -131,7 +133,9 @@ TEST_F(A2BEncoderDecoderTest, PacketDecoderTest) icsneo::PCMType::L16 ) == static_cast((0x04 << 8) | (0x08))); - EXPECT_TRUE(packetizer->input(recvBytes)); + ringBuffer.clear(); + ringBuffer.write(recvBytes); + EXPECT_TRUE(packetizer->input(ringBuffer)); auto packets = packetizer->output(); if(packets.empty()) { EXPECT_TRUE(false); diff --git a/test/i2cencoderdecodertest.cpp b/test/i2cencoderdecodertest.cpp index 857c593..e5fe628 100644 --- a/test/i2cencoderdecodertest.cpp +++ b/test/i2cencoderdecodertest.cpp @@ -3,6 +3,7 @@ #include "icsneo/communication/packet/i2cpacket.h" #include "icsneo/communication/message/i2cmessage.h" #include "icsneo/communication/packetizer.h" +#include "icsneo/communication/ringbuffer.h" #include "icsneo/api/eventmanager.h" #include "gtest/gtest.h" #include @@ -30,6 +31,8 @@ protected: std::optional packetEncoder; std::optional packetizer; std::optional packetDecoder; + RingBuffer ringBuffer = RingBuffer(128); + //Read request to the device //Control length 1, control bytes 0x12 (I2C register to read from) //data length 1: blank bytes padded in that the device will fill in the reply @@ -73,7 +76,9 @@ TEST_F(I2CEncoderDecoderTest, PacketDecoderTest) { message->isTXMsg = true; message->timestamp = static_cast(0xB0FCC1FBE62997); - EXPECT_TRUE(packetizer->input(recvBytes)); + ringBuffer.clear(); + ringBuffer.write(recvBytes); + EXPECT_TRUE(packetizer->input(ringBuffer)); auto packets = packetizer->output(); if(packets.empty()) { EXPECT_TRUE(false); } EXPECT_TRUE(packetDecoder->decode(decodeMsg, packets.back())); diff --git a/test/linencoderdecodertest.cpp b/test/linencoderdecodertest.cpp index b5e407d..dede8ce 100644 --- a/test/linencoderdecodertest.cpp +++ b/test/linencoderdecodertest.cpp @@ -3,6 +3,7 @@ #include "icsneo/communication/packet/linpacket.h" #include "icsneo/communication/message/linmessage.h" #include "icsneo/communication/packetizer.h" +#include "icsneo/communication/ringbuffer.h" #include "icsneo/api/eventmanager.h" #include "gtest/gtest.h" #include @@ -31,6 +32,7 @@ protected: std::optional packetEncoder; std::optional packetizer; std::optional packetDecoder; + RingBuffer ringBuffer = RingBuffer(128); //Responder load data before response LIN 2 // ID 0x22 pID 0xE2 length 8 std::vector testRespData = @@ -165,7 +167,9 @@ TEST_F(LINEncoderDecoderTest, PacketDecoderTest) { msg2->data = {0xaa, 0xbb, 0xcc}; msg2->checksum = 0xcc; - EXPECT_TRUE(packetizer->input(recvBytes)); + ringBuffer.clear(); + ringBuffer.write(recvBytes); + EXPECT_TRUE(packetizer->input(ringBuffer)); auto packets = packetizer->output(); if(packets.size() != 2) { EXPECT_TRUE(false); } //LIN2 frame from device diff --git a/test/livedataencoderdecodertest.cpp b/test/livedataencoderdecodertest.cpp index 94d8b8d..3f04ad2 100644 --- a/test/livedataencoderdecodertest.cpp +++ b/test/livedataencoderdecodertest.cpp @@ -3,6 +3,7 @@ #include "icsneo/communication/packet/livedatapacket.h" #include "icsneo/communication/message/livedatamessage.h" #include "icsneo/communication/packetizer.h" +#include "icsneo/communication/ringbuffer.h" #include "icsneo/api/eventmanager.h" #include "gtest/gtest.h" #include @@ -42,6 +43,7 @@ protected: std::optional packetEncoder; std::optional packetizer; std::optional packetDecoder; + RingBuffer ringBuffer = RingBuffer(128); const std::vector testBytesSub = { @@ -157,7 +159,9 @@ TEST_F(LiveDataEncoderDecoderTest, EncodeClearCommandTest) { TEST_F(LiveDataEncoderDecoderTest, DecoderStatusTest) { std::shared_ptr result; - if (packetizer->input(testBytesStatus)) { + ringBuffer.clear(); + ringBuffer.write(testBytesStatus); + if (packetizer->input(ringBuffer)) { for (const auto& packet : packetizer->output()) { if (!packetDecoder->decode(result, packet)) continue; @@ -173,7 +177,9 @@ TEST_F(LiveDataEncoderDecoderTest, DecoderStatusTest) { TEST_F(LiveDataEncoderDecoderTest, DecoderResponseTest) { std::shared_ptr result; - if (packetizer->input(testBytesResponse)) { + ringBuffer.clear(); + ringBuffer.write(testBytesResponse); + if (packetizer->input(ringBuffer)) { for (const auto& packet : packetizer->output()) { if (!packetDecoder->decode(result, packet)) continue; diff --git a/test/mdioencoderdecodertest.cpp b/test/mdioencoderdecodertest.cpp index 532c6af..bef7835 100644 --- a/test/mdioencoderdecodertest.cpp +++ b/test/mdioencoderdecodertest.cpp @@ -3,6 +3,7 @@ #include "icsneo/communication/packet/mdiopacket.h" #include "icsneo/communication/message/mdiomessage.h" #include "icsneo/communication/packetizer.h" +#include "icsneo/communication/ringbuffer.h" #include "icsneo/api/eventmanager.h" #include "gtest/gtest.h" #include @@ -30,6 +31,7 @@ protected: std::optional packetEncoder; std::optional packetizer; std::optional packetDecoder; + RingBuffer ringBuffer = RingBuffer(128); std::vector testBytesClause22 = {0xAA, 0x0C, 0x11, 0x00, 0x21, 0x02, 0xAB, 0xCD, @@ -156,7 +158,9 @@ TEST_F(MDIOEncoderDecoderTest, PacketDecoderClause22Test) { message->isTXMsg = true; message->timestamp = static_cast(0xB0FCC1FBE62997); - EXPECT_TRUE(packetizer->input(recvBytesClause22)); + ringBuffer.clear(); + ringBuffer.write(recvBytesClause22); + EXPECT_TRUE(packetizer->input(ringBuffer)); auto packets = packetizer->output(); EXPECT_FALSE(packets.empty()); EXPECT_TRUE(packetDecoder->decode(decodeMsg, packets.back())); @@ -202,7 +206,9 @@ TEST_F(MDIOEncoderDecoderTest, PacketDecoderClause45Test) { message->txInvalidOpcode = true; message->timestamp = static_cast(0xB0FCC1FBE62997); - EXPECT_TRUE(packetizer->input(recvBytesClause45)); + ringBuffer.clear(); + ringBuffer.write(recvBytesClause45); + EXPECT_TRUE(packetizer->input(ringBuffer)); auto packets = packetizer->output(); EXPECT_FALSE(packets.empty()); EXPECT_TRUE(packetDecoder->decode(decodeMsg, packets.back())); diff --git a/test/ringbuffertest.cpp b/test/ringbuffertest.cpp new file mode 100644 index 0000000..4a89213 --- /dev/null +++ b/test/ringbuffertest.cpp @@ -0,0 +1,96 @@ +#include "icsneo/communication/ringbuffer.h" +#include "gtest/gtest.h" + +using namespace icsneo; + +class RingBufferTest : public ::testing::Test { +protected: + static constexpr const size_t bufferSize = 32u; + static constexpr const size_t testDataSize = 32u; + RingBuffer ringBuffer = RingBuffer(bufferSize); + void SetUp() override { + ringBuffer.clear(); + } + + const std::vector testBytes = { + 0, 1, 2, 3, 4, 5, 6, 7, + 8, 9, 10, 11, 12, 13, 14, 15, + 16, 17, 18, 19, 20, 21, 22, 23, + 24, 25, 26, 27, 28, 29, 30, 31 + }; +}; + +TEST_F(RingBufferTest, ConstructorTest) { + // Standard case, integral power of 2 + ASSERT_EQ(RingBuffer(16).capacity(), 16u); + // Edge cases + // SIZE_MAX - commented out because this will throw on all architectures due to the allocation that happens here. + //ASSERT_EQ(RingBuffer(SIZE_MAX).capacity(), RingBuffer::MaxSize); + // Zero + ASSERT_EQ(RingBuffer(0).capacity(), 1u); + // arbitrary number that is not a power of 2 + ASSERT_EQ(RingBuffer(60).capacity(), 64u); +} + +TEST_F(RingBufferTest, InitAndCapacityTest) { + constexpr auto size = 8u; + RingBuffer rb(size); + ASSERT_EQ(rb.size(), 0u); + ASSERT_EQ(rb.capacity(), size); +} + +TEST_F(RingBufferTest, WriteAndClearTest) { + ASSERT_TRUE(ringBuffer.write(testBytes)); + ASSERT_EQ(ringBuffer.size(), testBytes.size()); + ringBuffer.clear(); + ASSERT_EQ(ringBuffer.size(), 0u); +} + +TEST_F(RingBufferTest, SimpleWriteReadTest) { + std::vector readBack(testDataSize); + ASSERT_TRUE(ringBuffer.write(testBytes)); + ASSERT_EQ(ringBuffer.size(), testDataSize); + ASSERT_TRUE(ringBuffer.read(readBack.data(), 0, testDataSize)); + ASSERT_EQ(readBack, testBytes); +} + +TEST_F(RingBufferTest, OverlappedReadWriteTest) { + std::vector readBack(testDataSize); + std::vector ignoredData(bufferSize - 3); + ASSERT_TRUE(ringBuffer.write(ignoredData)); + ringBuffer.pop(ignoredData.size()); + ASSERT_EQ(ringBuffer.size(), 0u); + ASSERT_TRUE(ringBuffer.write(testBytes)); + ASSERT_TRUE(ringBuffer.read(readBack.data(), 0, testDataSize)); + ASSERT_EQ(readBack, testBytes); +} + +TEST_F(RingBufferTest, WritePastReadCursorTest) { + std::vector readBack(ringBuffer.capacity()); + // Fill + ASSERT_TRUE(ringBuffer.write(testBytes)); + // Read partial + auto readSize = ringBuffer.size() - 4; + ASSERT_TRUE(ringBuffer.read(readBack.data(), 0, readSize)); + // Now writeCursor (masked) is 0, readCursor (masked) is capacity() - 4, writing past the read cursor should fail. + ASSERT_FALSE(ringBuffer.write(testBytes.data(), readSize + 1)); + +} + +TEST_F(RingBufferTest, WriteWhenFullTest) { + std::vector fillData(ringBuffer.capacity()); + ASSERT_TRUE(ringBuffer.write(fillData)); + ASSERT_FALSE(ringBuffer.write(fillData.data(), 1)); +} + +TEST_F(RingBufferTest, ReadPastEndTest) { + uint8_t dummy = 0; + // Single byte when empty + ASSERT_FALSE(ringBuffer.read(&dummy, 0, 1)); + // Put in a byte + ASSERT_TRUE(ringBuffer.write(&dummy, 1)); + // Single byte from offset when filled only to offset + ASSERT_FALSE(ringBuffer.read(&dummy, ringBuffer.size(), 1)); + // Single byte from offset past size + ASSERT_FALSE(ringBuffer.read(&dummy, ringBuffer.size()+1, 1)); +}