Replace concurrentqueue with ringbuffer

pull/64/head
Jonathan Schwartz 2024-04-05 17:24:53 +00:00
parent 7d2d12c5cd
commit 63f0516318
22 changed files with 361 additions and 169 deletions

View File

@ -266,6 +266,7 @@ set(SRC_FILES
communication/communication.cpp
communication/driver.cpp
communication/livedata.cpp
communication/ringbuffer.cpp
device/extensions/flexray/extension.cpp
device/extensions/flexray/controller.cpp
device/idevicesettings.cpp
@ -514,6 +515,7 @@ if(LIBICSNEO_BUILD_TESTS)
test/a2bencoderdecodertest.cpp
test/mdioencoderdecodertest.cpp
test/livedataencoderdecodertest.cpp
test/ringbuffertest.cpp
)
target_link_libraries(libicsneo-tests gtest gtest_main)

View File

@ -268,7 +268,7 @@ void Communication::readTask() {
while(!closing) {
readBytes.clear();
if(driver->readWait(readBytes)) {
if(driver->readAvailable()) {
handleInput(*packetizer, readBytes);
}
}
@ -291,7 +291,7 @@ void Communication::handleInput(Packetizer& p, std::vector<uint8_t>& readBytes)
handleInput(p, readBytes); // and we might as well process this input ourselves
}
} else {
if(p.input(readBytes)) {
if(p.input(driver->getReadBuffer())) {
for(const auto& packet : p.output()) {
std::shared_ptr<Message> msg;
if(!decoder->decode(msg, packet))

View File

@ -8,37 +8,24 @@
using namespace icsneo;
bool Driver::read(std::vector<uint8_t>& bytes, size_t limit) {
// A limit of zero indicates no limit
if(limit == 0)
limit = (size_t)-1;
if(limit > (readQueue.size_approx() + 4))
limit = (readQueue.size_approx() + 4);
if(bytes.capacity() < limit)
bytes.resize(limit);
size_t actuallyRead = readQueue.try_dequeue_bulk(bytes.data(), limit);
if(bytes.size() > actuallyRead)
bytes.resize(actuallyRead);
return true;
}
bool Driver::readWait(std::vector<uint8_t>& bytes, std::chrono::milliseconds timeout, size_t limit) {
// A limit of zero indicates no limit
if(limit == 0)
limit = (size_t)-1;
if(limit > (readQueue.size_approx() + 4))
limit = (readQueue.size_approx() + 4);
if(limit > (readBuffer.size() + 4))
limit = (readBuffer.size() + 4);
bytes.resize(limit);
size_t actuallyRead = readQueue.wait_dequeue_bulk_timed(bytes.data(), limit, timeout);
// wait until we have enough data, or the timout occurs
const auto timeoutTime = std::chrono::steady_clock::now() + timeout;
while (readBuffer.size() < limit && std::chrono::steady_clock::now() < timeoutTime) {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
size_t actuallyRead = std::min(readBuffer.size(), limit);
readBuffer.read(bytes.data(), 0, actuallyRead);
readBuffer.pop(actuallyRead);
bytes.resize(actuallyRead);
#ifdef ICSNEO_DRIVER_DEBUG_PRINTS

View File

@ -24,11 +24,9 @@ std::vector<uint8_t>& Packetizer::packetWrap(std::vector<uint8_t>& data, bool sh
return data;
}
bool Packetizer::input(const std::vector<uint8_t>& inputBytes) {
bool Packetizer::input(RingBuffer& bytes) {
bool haveEnoughData = true;
bytes.Copy(inputBytes);
while(haveEnoughData) {
switch(state) {
case ReadState::SearchForHeader:
@ -152,14 +150,14 @@ bool Packetizer::input(const std::vector<uint8_t>& inputBytes) {
if(packetLength > 0)
packet.data.resize(packetLength - headerSize);
bytes.CopyTo(packet.data.data(), currentIndex, (packetLength - currentIndex));
bytes.read(packet.data.data(), currentIndex, (packetLength - currentIndex));
currentIndex = packetLength;
if(disableChecksum || !checksum || bytes[currentIndex] == ICSChecksum(packet.data)) {
// Got a good packet
gotGoodPackets = true;
processedPackets.push_back(std::make_shared<Packet>(packet));
bytes.Erase_front(packetLength);
bytes.pop(packetLength);
if(packet.network == Network::NetID::DiskData && (packetLength - headerSize) % 2 == 0) {
bytes.pop_front();

View File

@ -0,0 +1,91 @@
#include "icsneo/communication/ringbuffer.h"
#include <stdexcept>
namespace icsneo {
RingBuffer::RingBuffer(size_t bufferSize) : readCursor(0), writeCursor(0) {
// round the buffer size to the nearest power of 2
bufferSize = RoundUp(bufferSize);
mask = bufferSize - 1;
buf = new uint8_t[bufferSize];
}
RingBuffer::~RingBuffer() {
delete[] buf;
buf = nullptr;
}
const uint8_t& RingBuffer::operator[](size_t offset) const {
return get(offset);
}
size_t RingBuffer::size() const {
// The values in the cursors are monotonic, i.e. they only ever increment. They can be considered to be the total number of elements ever written or read
auto currentWriteCursor = writeCursor.load(std::memory_order_relaxed);
auto currentReadCursor = readCursor.load(std::memory_order_relaxed);
// Using unmasked values, writeCursor is guaranteed to be >= readCursor. If they are equal that means the buffer is empty
return currentWriteCursor - currentReadCursor;
}
void RingBuffer::pop_front() {
pop(1);
}
void RingBuffer::pop(size_t count) {
if (size() < count) {
throw std::runtime_error("RingBuffer: Underflow");
}
readCursor.fetch_add(count, std::memory_order_release);
}
const uint8_t& RingBuffer::get(size_t offset) const {
if (offset >= size()) {
throw std::runtime_error("RingBuffer: Index out of range");
}
auto currentReadCursor = readCursor.load(std::memory_order_acquire);
return *resolve(currentReadCursor, offset);
}
bool RingBuffer::write(const uint8_t* addr, size_t length) {
const auto freeSpace = (capacity() - size());
if (length > freeSpace) {
return false;
}
auto currentWriteCursor = writeCursor.load(std::memory_order_relaxed);
auto spaceAtEnd = std::min(freeSpace, capacity() - (currentWriteCursor & mask)); // number of bytes from (masked) writeCursor to the end of the writable space (i.e. we reach the masked read cursor or the end of the buffer)
auto firstCopySize = std::min(spaceAtEnd, length);
(void)memcpy(resolve(currentWriteCursor, 0), addr, firstCopySize);
if (firstCopySize < length)
{
(void)memcpy(buf, &addr[firstCopySize], length - firstCopySize);
}
writeCursor.store(currentWriteCursor + length, std::memory_order_release);
return true;
}
bool RingBuffer::write(const std::vector<uint8_t>& source) {
return write(source.data(), source.size());
}
bool RingBuffer::read(uint8_t* dest, size_t startIndex, size_t length) const {
auto currentSize = size();
if ((startIndex >= currentSize) || ((startIndex + length) > size())) {
return false;
}
auto currentReadCursor = readCursor.load(std::memory_order_relaxed);
auto bytesAtEnd = std::min<size_t>(capacity() - ((currentReadCursor + startIndex) & mask), length);
const auto bytesAtStart = (length - bytesAtEnd);
(void)memcpy(dest, resolve(currentReadCursor, startIndex), bytesAtEnd);
if (bytesAtStart > 0) {
(void)memcpy(&dest[bytesAtEnd], buf, bytesAtStart);
}
return true;
}
void RingBuffer::clear() {
pop(size());
}
}

View File

@ -11,9 +11,11 @@
#include <condition_variable>
#include "icsneo/api/eventmanager.h"
#include "icsneo/third-party/concurrentqueue/blockingconcurrentqueue.h"
#include "icsneo/communication/ringbuffer.h"
namespace icsneo {
#define ICSNEO_DRIVER_RINGBUFFER_SIZE (512 * 1024)
class Driver {
public:
Driver(const device_eventhandler_t& handler) : report(handler) {}
@ -24,10 +26,11 @@ public:
virtual void awaitModeChangeComplete() {}
virtual bool isDisconnected() { return disconnected; };
virtual bool close() = 0;
bool read(std::vector<uint8_t>& bytes, size_t limit = 0);
bool readWait(std::vector<uint8_t>& bytes, std::chrono::milliseconds timeout = std::chrono::milliseconds(100), size_t limit = 0);
bool write(const std::vector<uint8_t>& bytes);
virtual bool isEthernet() const { return false; }
bool readAvailable() { return readBuffer.size() > 0; }
RingBuffer& getReadBuffer() { return readBuffer; }
device_eventhandler_t report;
@ -54,7 +57,8 @@ protected:
virtual bool writeQueueAlmostFull() { return writeQueue.size_approx() > (writeQueueSize * 3 / 4); }
virtual bool writeInternal(const std::vector<uint8_t>& b) { return writeQueue.enqueue(WriteOperation(b)); }
moodycamel::BlockingConcurrentQueue<uint8_t> readQueue;
RingBuffer readBuffer = RingBuffer(ICSNEO_DRIVER_RINGBUFFER_SIZE);
moodycamel::BlockingConcurrentQueue<WriteOperation> writeQueue;
std::thread readThread, writeThread;
std::atomic<bool> closing{false};

View File

@ -4,14 +4,13 @@
#ifdef __cplusplus
#include "icsneo/communication/packet.h"
#include "icsneo/communication/ringbuffer.h"
#include "icsneo/api/eventmanager.h"
#include <queue>
#include <vector>
#include <memory>
#include <cstring>
#define ICSNEO_PACKETIZER_BUFFER_SIZE (512 * 1024)
namespace icsneo {
class Packetizer {
@ -22,7 +21,7 @@ public:
std::vector<uint8_t>& packetWrap(std::vector<uint8_t>& data, bool shortFormat) const;
bool input(const std::vector<uint8_t>& bytes);
bool input(RingBuffer& bytes);
std::vector<std::shared_ptr<Packet>> output();
bool disableChecksum = false; // Even for short packets
@ -37,95 +36,6 @@ private:
GetData
};
class RingBuffer
{
private:
constexpr static size_t mBufferSize = ICSNEO_PACKETIZER_BUFFER_SIZE;
size_t mStartOffset;
size_t mSize;
uint8_t mData[mBufferSize];
public:
RingBuffer(void)
: mStartOffset(0)
, mSize(0)
{
(void)memset(mData, 0, mBufferSize);
}
const uint8_t& operator [](size_t offset) { return Get(offset); }
size_t size(void) { return mSize; }
void pop_front(void)
{
Erase_front(1);
}
void Erase_front(size_t count)
{
if (mSize < count)
{
throw std::runtime_error("RingBuffer: Underflow");
}
mStartOffset = (mStartOffset + count) % mBufferSize;
mSize -= count;
}
const uint8_t& Get(size_t offset)
{
if (offset >= mSize)
{
throw std::runtime_error("RingBuffer: Index out of range");
}
return *Resolve(offset);
}
void Copy(const std::vector<uint8_t>& source)
{
const auto inputSize = source.size();
const auto octetsAvailable = (mBufferSize - mSize);
if (inputSize > octetsAvailable)
{
throw std::runtime_error("RingBuffer: Out of memory");
}
const auto octetsAvailableTail = (octetsAvailable - mStartOffset);
const auto octetsToWrap = (inputSize > octetsAvailableTail) ? (inputSize - octetsAvailableTail) : 0;
const auto octetsToAppend = (inputSize - octetsToWrap);
(void)memcpy(Resolve(mSize), source.data(), octetsToAppend);
if (octetsToWrap > 0)
{
(void)memcpy(mData, &source.data()[octetsToAppend], octetsToWrap);
}
mSize += inputSize;
}
void CopyTo(uint8_t* dest, size_t startIndex, size_t length)
{
if ((startIndex + length) > mSize)
{
throw std::runtime_error("RingBuffer: Index out of range");
}
const auto octetsToReadHead = std::min<size_t>((mBufferSize - mStartOffset - startIndex), length);
const auto octetsToReadTail = (length - octetsToReadHead);
(void)memcpy(dest, Resolve(startIndex), octetsToReadHead);
if (octetsToReadTail > 0)
{
(void)memcpy(&dest[octetsToReadHead], mData, octetsToReadTail);
}
}
protected:
inline uint8_t* Resolve(size_t offset)
{
return &mData[(mStartOffset + offset) % mBufferSize];
}
};
ReadState state = ReadState::SearchForHeader;
int currentIndex = 0;
@ -134,7 +44,6 @@ private:
bool checksum = false;
bool gotGoodPackets = false; // Tracks whether we've ever gotten a good packet
Packet packet;
RingBuffer bytes;
std::vector<std::shared_ptr<Packet>> processedPackets;

View File

@ -0,0 +1,75 @@
#ifndef _RINGBUFFER_H_
#define _RINGBUFFER_H_
#include <cstdint>
#include <cstddef>
#include <memory>
#include <cstring>
#include <mutex>
#include <atomic>
#include <vector>
#if __cplusplus >= 202002L
#include <bit>
#endif
namespace icsneo {
class RingBuffer
{
private:
static constexpr size_t RoundUp(size_t size) {
if (size == 0) {
// Avoid underflow when decrementing later
return 1;
} else if (size >= SIZE_MAX) {
// overflow case - resolve to max size
return MaxSize;
}
#if __cplusplus >= 202002L
// c++20 gives us countl_zero which should be more effecient on most platforms
auto lzero = std::countl_zero(size - 1);
auto shift = (sizeof(size_t) * 8) - lzero;
return 1ull << shift;
#else
// Bit twiddling magic! See http://graphics.stanford.edu/~seander/bithacks.html#RoundUpPowerOf2
--size;
size |= size >> 1;
size |= size >> 2;
size |= size >> 4;
for (size_t i = 1; i < sizeof(size_t); i <<= 1) {
size |= size >> (i << 3);
}
++size;
return size;
#endif
}
//static_assert(std::atomic<size_t>::is_always_lock_free, "RingBuffer cursor types are not lock-free");
std::atomic<size_t> readCursor;
std::atomic<size_t> writeCursor;
// Use this to mask the cursor values to the buffer size. This is set to capacity - 1 where capacity is always an integral power of 2 (2, 4, 8, 16, etc)
size_t mask;
uint8_t* buf;
public:
static constexpr auto MaxSize = 1ull << ((8 * sizeof(size_t)) - 1);
RingBuffer(size_t bufferSize);
~RingBuffer();
const uint8_t& operator[](size_t offset) const;
size_t size() const;
void pop_front();
void pop(size_t count);
const uint8_t& get(size_t offset) const;
bool write(const uint8_t* addr, size_t count);
bool write(const std::vector<uint8_t>& source);
bool read(uint8_t* dest, size_t startIndex, size_t length) const;
void clear();
constexpr size_t capacity() const {
return mask + 1;
}
protected:
inline uint8_t* resolve(size_t cursor, size_t offset) const {
return &buf[(cursor + offset) & mask];
}
};
}
#endif

View File

@ -89,9 +89,8 @@ bool FTD3XX::close() {
if(writeThread.joinable())
writeThread.join();
uint8_t flush;
WriteOperation flushop;
while(readQueue.try_dequeue(flush)) {}
readBuffer.pop(readBuffer.size());
while(writeQueue.try_dequeue(flushop)) {}
if(const auto ret = FT_Close(*handle); ret != FT_OK) {
@ -137,7 +136,7 @@ void FTD3XX::readTask() {
}
FT_ReleaseOverlapped(*handle, &overlap);
if(received > 0) {
readQueue.enqueue_bulk(buffer, received);
readBuffer.write(buffer, received);
}
}
}

View File

@ -140,9 +140,8 @@ bool CDCACM::close() {
int ret = ::close(fd);
fd = -1;
uint8_t flush;
WriteOperation flushop;
while (readQueue.try_dequeue(flush)) {}
readBuffer.clear();
while (writeQueue.try_dequeue(flushop)) {}
if(modeChanging) {
@ -191,8 +190,7 @@ void CDCACM::readTask() {
}
std::cout << std::dec << std::endl;
#endif
readQueue.enqueue_bulk(readbuf, bytesRead);
readBuffer.write(readbuf, bytesRead);
} else {
if(modeChanging) {
// We were expecting a disconnect for reenumeration

View File

@ -16,6 +16,7 @@
#include <errno.h>
#include <sys/select.h>
#include <sys/mman.h>
#include <sys/resource.h>
using namespace icsneo;
@ -44,13 +45,13 @@ void FirmIO::Find(std::vector<FoundDevice>& found) {
const auto start = steady_clock::now();
// Get an absolute wall clock to compare to
const auto overallTimeout = start + milliseconds(500);
while(temp.readWait(payload, milliseconds(50))) {
while(!temp.readAvailable()) {
if(steady_clock::now() > overallTimeout) {
// failed to read out a serial number reponse in time
break;
}
if(!packetizer.input(payload))
if(!packetizer.input(temp.getReadBuffer()))
continue; // A full packet has not yet been read out
for(const auto& packet : packetizer.output()) {
@ -182,9 +183,6 @@ bool FirmIO::close() {
ret |= ::close(fd);
fd = -1;
uint8_t flush;
while (readQueue.try_dequeue(flush)) {}
if(ret == 0) {
return true;
} else {
@ -198,6 +196,12 @@ void FirmIO::readTask() {
Msg msg;
std::vector<Msg> toFree;
// attempt to elevate the thread priority. PRIO_MIN is actually the highest priority but the lowest value.
int err = setpriority(PRIO_PROCESS, 0, -1);
if (err != 0) {
std::cerr << "FirmIO::readTask setpriority failed : " << strerror(errno) << std::endl;
}
while(!closing && !isDisconnected()) {
fd_set rfds = {0};
struct timeval tv = {0};
@ -219,10 +223,7 @@ void FirmIO::readTask() {
toFree.clear();
int i = 0;
while(!in->isEmpty() && i++ < 1000) {
if(!in->read(&msg))
break;
while(in->read(&msg) && i++ < 1000) {
switch(msg.command) {
case Msg::Command::ComData: {
if(toFree.empty() || toFree.back().payload.free.refCount == 6) {
@ -241,9 +242,14 @@ void FirmIO::readTask() {
// Translate the physical address back to our virtual address space
uint8_t* addr = reinterpret_cast<uint8_t*>(msg.payload.data.addr - PHY_ADDR_BASE + vbase);
readQueue.enqueue_bulk(addr, msg.payload.data.len);
break;
while (!readBuffer.write(addr, msg.payload.data.len)) {
std::this_thread::sleep_for(std::chrono::milliseconds(1)); // back-off so reading thread can empty the buffer
if (closing || isDisconnected()) {
break;
}
}
}
break;
case Msg::Command::ComFree: {
std::lock_guard<std::mutex> lk(outMutex);
// std::cout << "Got some free " << std::hex << msg.payload.free.ref[0] << std::endl;

View File

@ -109,9 +109,8 @@ bool FTDI::close() {
report(APIEvent::Type::DriverFailedToClose, APIEvent::Severity::Error);
}
uint8_t flush;
WriteOperation flushop;
while(readQueue.try_dequeue(flush)) {}
readBuffer.clear();
while(writeQueue.try_dequeue(flushop)) {}
closing = false;
@ -214,7 +213,7 @@ void FTDI::readTask() {
} else
report(APIEvent::Type::FailedToRead, APIEvent::Severity::EventWarning);
} else
readQueue.enqueue_bulk(readbuf, readBytes);
readBuffer.write(readbuf, readBytes);
}
}

View File

@ -136,6 +136,8 @@ void PCAP::Find(std::vector<FoundDevice>& found) {
pcap_sendpacket(iface.fp, bs.data(), (int)bs.size());
auto timeout = std::chrono::high_resolution_clock::now() + std::chrono::milliseconds(50);
constexpr const size_t TempBufferSize = 4096;
RingBuffer tempBuffer(TempBufferSize);
while(std::chrono::high_resolution_clock::now() <= timeout) { // Wait up to 50ms for the response
struct pcap_pkthdr* header;
const uint8_t* data;
@ -158,7 +160,8 @@ void PCAP::Find(std::vector<FoundDevice>& found) {
continue; // This packet is not for us
Packetizer packetizer([](APIEvent::Type, APIEvent::Severity) {});
if(!packetizer.input(ethPacketizer.outputUp()))
tempBuffer.write(ethPacketizer.outputUp());
if(!packetizer.input(tempBuffer))
continue; // This packet was not well formed
EthernetPacketizer::EthernetPacket decoded(data, header->caplen);
@ -267,9 +270,8 @@ bool PCAP::close() {
pcap_close(iface.fp);
iface.fp = nullptr;
uint8_t flush;
WriteOperation flushop;
while(readQueue.try_dequeue(flush)) {}
readBuffer.clear();
while(writeQueue.try_dequeue(flushop)) {}
return true;
@ -282,7 +284,7 @@ void PCAP::readTask() {
PCAP* driver = reinterpret_cast<PCAP*>(obj);
if(driver->ethPacketizer.inputUp({data, data + header->caplen})) {
const auto bytes = driver->ethPacketizer.outputUp();
driver->readQueue.enqueue_bulk(bytes.data(), bytes.size());
driver->readBuffer.write(bytes.data(), bytes.size());
}
}, (uint8_t*)this);
}

View File

@ -510,9 +510,8 @@ bool TCP::close() {
if(writeThread.joinable())
writeThread.join();
uint8_t flush;
WriteOperation flushop;
while(readQueue.try_dequeue(flush)) {}
readBuffer.pop(readBuffer.size());
while(writeQueue.try_dequeue(flushop)) {}
socket.reset();
@ -534,7 +533,7 @@ void TCP::readTask() {
uint8_t readbuf[READ_BUFFER_SIZE];
while(!closing) {
if(const auto received = ::recv(*socket, (char*)readbuf, READ_BUFFER_SIZE, 0); received > 0) {
readQueue.enqueue_bulk(readbuf, received);
readBuffer.write(readbuf, received);
} else {
timeout.tv_sec = 0;
timeout.tv_usec = 50'000;

View File

@ -7,6 +7,7 @@
#include "icsneo/communication/ethernetpacketizer.h"
#include "icsneo/communication/packetizer.h"
#include "icsneo/communication/decoder.h"
#include "icsneo/communication/ringbuffer.h"
#include <pcap.h>
#include <iphlpapi.h>
#pragma comment(lib, "IPHLPAPI.lib")
@ -123,7 +124,9 @@ void PCAP::Find(std::vector<FoundDevice>& found) {
auto bs = requestPacket.getBytestream();
pcap.sendpacket(iface.fp, bs.data(), (int)bs.size());
auto timeout = std::chrono::high_resolution_clock::now() + std::chrono::milliseconds(5);
auto timeout = std::chrono::high_resolution_clock::now() + std::chrono::milliseconds(250);
constexpr const size_t TempBufferSize = 4096;
RingBuffer tempBuffer(TempBufferSize);
while(std::chrono::high_resolution_clock::now() <= timeout) { // Wait up to 5ms for the response
struct pcap_pkthdr* header;
const uint8_t* data;
@ -142,7 +145,8 @@ void PCAP::Find(std::vector<FoundDevice>& found) {
continue; // This packet is not for us
Packetizer packetizer([](APIEvent::Type, APIEvent::Severity) {});
if(!packetizer.input(ethPacketizer.outputUp()))
tempBuffer.write(ethPacketizer.outputUp());
if(!packetizer.input(tempBuffer))
continue; // This packet was not well formed
EthernetPacketizer::EthernetPacket decoded(data, header->caplen);
@ -251,9 +255,8 @@ bool PCAP::close() {
pcap.close(iface.fp);
iface.fp = nullptr;
uint8_t flush;
WriteOperation flushop;
while(readQueue.try_dequeue(flush)) {}
readBuffer.clear();
while(writeQueue.try_dequeue(flushop)) {}
transmitQueue = nullptr;
@ -275,7 +278,7 @@ void PCAP::readTask() {
if(ethPacketizer.inputUp({data, data + header->caplen})) {
const auto bytes = ethPacketizer.outputUp();
readQueue.enqueue_bulk(bytes.data(), bytes.size());
readBuffer.write(bytes.data(), bytes.size());
}
}
}

View File

@ -357,9 +357,8 @@ bool VCP::close() {
detail->overlappedWait.hEvent = INVALID_HANDLE_VALUE;
}
uint8_t flush;
WriteOperation flushop;
while(readQueue.try_dequeue(flush)) {}
readBuffer.clear();
while(writeQueue.try_dequeue(flushop)) {}
if(!ret)
@ -391,7 +390,7 @@ void VCP::readTask() {
if(ReadFile(detail->handle, readbuf, READ_BUFFER_SIZE, nullptr, &detail->overlappedRead)) {
if(GetOverlappedResult(detail->handle, &detail->overlappedRead, &bytesRead, FALSE)) {
if(bytesRead)
readQueue.enqueue_bulk(readbuf, bytesRead);
readBuffer.write(readbuf, bytesRead);
}
continue;
}
@ -414,7 +413,7 @@ void VCP::readTask() {
auto ret = WaitForSingleObject(detail->overlappedRead.hEvent, 100);
if(ret == WAIT_OBJECT_0) {
if(GetOverlappedResult(detail->handle, &detail->overlappedRead, &bytesRead, FALSE)) {
readQueue.enqueue_bulk(readbuf, bytesRead);
readBuffer.write(readbuf, bytesRead);
state = LAUNCH;
} else
report(APIEvent::Type::FailedToRead, APIEvent::Severity::Error);

View File

@ -3,6 +3,7 @@
#include "icsneo/communication/packet/a2bpacket.h"
#include "icsneo/communication/message/a2bmessage.h"
#include "icsneo/communication/packetizer.h"
#include "icsneo/communication/ringbuffer.h"
#include "icsneo/api/eventmanager.h"
#include "gtest/gtest.h"
#include <vector>
@ -26,6 +27,7 @@ protected:
std::optional<Encoder> packetEncoder;
std::optional<Packetizer> packetizer;
std::optional<Decoder> packetDecoder;
RingBuffer ringBuffer = RingBuffer(128);
std::vector<uint8_t> testBytes =
{0xaa, 0x0c, 0x15, 0x00, 0x0b, 0x02, 0x00, 0x00,
@ -131,7 +133,9 @@ TEST_F(A2BEncoderDecoderTest, PacketDecoderTest)
icsneo::PCMType::L16
) == static_cast<icsneo::PCMSample>((0x04 << 8) | (0x08)));
EXPECT_TRUE(packetizer->input(recvBytes));
ringBuffer.clear();
ringBuffer.write(recvBytes);
EXPECT_TRUE(packetizer->input(ringBuffer));
auto packets = packetizer->output();
if(packets.empty()) {
EXPECT_TRUE(false);

View File

@ -3,6 +3,7 @@
#include "icsneo/communication/packet/i2cpacket.h"
#include "icsneo/communication/message/i2cmessage.h"
#include "icsneo/communication/packetizer.h"
#include "icsneo/communication/ringbuffer.h"
#include "icsneo/api/eventmanager.h"
#include "gtest/gtest.h"
#include <vector>
@ -30,6 +31,8 @@ protected:
std::optional<Encoder> packetEncoder;
std::optional<Packetizer> packetizer;
std::optional<Decoder> packetDecoder;
RingBuffer ringBuffer = RingBuffer(128);
//Read request to the device
//Control length 1, control bytes 0x12 (I2C register to read from)
//data length 1: blank bytes padded in that the device will fill in the reply
@ -73,7 +76,9 @@ TEST_F(I2CEncoderDecoderTest, PacketDecoderTest) {
message->isTXMsg = true;
message->timestamp = static_cast<uint64_t>(0xB0FCC1FBE62997);
EXPECT_TRUE(packetizer->input(recvBytes));
ringBuffer.clear();
ringBuffer.write(recvBytes);
EXPECT_TRUE(packetizer->input(ringBuffer));
auto packets = packetizer->output();
if(packets.empty()) { EXPECT_TRUE(false); }
EXPECT_TRUE(packetDecoder->decode(decodeMsg, packets.back()));

View File

@ -3,6 +3,7 @@
#include "icsneo/communication/packet/linpacket.h"
#include "icsneo/communication/message/linmessage.h"
#include "icsneo/communication/packetizer.h"
#include "icsneo/communication/ringbuffer.h"
#include "icsneo/api/eventmanager.h"
#include "gtest/gtest.h"
#include <vector>
@ -31,6 +32,7 @@ protected:
std::optional<Encoder> packetEncoder;
std::optional<Packetizer> packetizer;
std::optional<Decoder> packetDecoder;
RingBuffer ringBuffer = RingBuffer(128);
//Responder load data before response LIN 2
// ID 0x22 pID 0xE2 length 8
std::vector<uint8_t> testRespData =
@ -165,7 +167,9 @@ TEST_F(LINEncoderDecoderTest, PacketDecoderTest) {
msg2->data = {0xaa, 0xbb, 0xcc};
msg2->checksum = 0xcc;
EXPECT_TRUE(packetizer->input(recvBytes));
ringBuffer.clear();
ringBuffer.write(recvBytes);
EXPECT_TRUE(packetizer->input(ringBuffer));
auto packets = packetizer->output();
if(packets.size() != 2) { EXPECT_TRUE(false); }
//LIN2 frame from device

View File

@ -3,6 +3,7 @@
#include "icsneo/communication/packet/livedatapacket.h"
#include "icsneo/communication/message/livedatamessage.h"
#include "icsneo/communication/packetizer.h"
#include "icsneo/communication/ringbuffer.h"
#include "icsneo/api/eventmanager.h"
#include "gtest/gtest.h"
#include <vector>
@ -42,6 +43,7 @@ protected:
std::optional<Encoder> packetEncoder;
std::optional<Packetizer> packetizer;
std::optional<Decoder> packetDecoder;
RingBuffer ringBuffer = RingBuffer(128);
const std::vector<uint8_t> testBytesSub =
{
@ -157,7 +159,9 @@ TEST_F(LiveDataEncoderDecoderTest, EncodeClearCommandTest) {
TEST_F(LiveDataEncoderDecoderTest, DecoderStatusTest) {
std::shared_ptr<Message> result;
if (packetizer->input(testBytesStatus)) {
ringBuffer.clear();
ringBuffer.write(testBytesStatus);
if (packetizer->input(ringBuffer)) {
for (const auto& packet : packetizer->output()) {
if (!packetDecoder->decode(result, packet))
continue;
@ -173,7 +177,9 @@ TEST_F(LiveDataEncoderDecoderTest, DecoderStatusTest) {
TEST_F(LiveDataEncoderDecoderTest, DecoderResponseTest) {
std::shared_ptr<Message> result;
if (packetizer->input(testBytesResponse)) {
ringBuffer.clear();
ringBuffer.write(testBytesResponse);
if (packetizer->input(ringBuffer)) {
for (const auto& packet : packetizer->output()) {
if (!packetDecoder->decode(result, packet))
continue;

View File

@ -3,6 +3,7 @@
#include "icsneo/communication/packet/mdiopacket.h"
#include "icsneo/communication/message/mdiomessage.h"
#include "icsneo/communication/packetizer.h"
#include "icsneo/communication/ringbuffer.h"
#include "icsneo/api/eventmanager.h"
#include "gtest/gtest.h"
#include <vector>
@ -30,6 +31,7 @@ protected:
std::optional<Encoder> packetEncoder;
std::optional<Packetizer> packetizer;
std::optional<Decoder> packetDecoder;
RingBuffer ringBuffer = RingBuffer(128);
std::vector<uint8_t> testBytesClause22 =
{0xAA, 0x0C, 0x11, 0x00, 0x21, 0x02, 0xAB, 0xCD,
@ -156,7 +158,9 @@ TEST_F(MDIOEncoderDecoderTest, PacketDecoderClause22Test) {
message->isTXMsg = true;
message->timestamp = static_cast<uint64_t>(0xB0FCC1FBE62997);
EXPECT_TRUE(packetizer->input(recvBytesClause22));
ringBuffer.clear();
ringBuffer.write(recvBytesClause22);
EXPECT_TRUE(packetizer->input(ringBuffer));
auto packets = packetizer->output();
EXPECT_FALSE(packets.empty());
EXPECT_TRUE(packetDecoder->decode(decodeMsg, packets.back()));
@ -202,7 +206,9 @@ TEST_F(MDIOEncoderDecoderTest, PacketDecoderClause45Test) {
message->txInvalidOpcode = true;
message->timestamp = static_cast<uint64_t>(0xB0FCC1FBE62997);
EXPECT_TRUE(packetizer->input(recvBytesClause45));
ringBuffer.clear();
ringBuffer.write(recvBytesClause45);
EXPECT_TRUE(packetizer->input(ringBuffer));
auto packets = packetizer->output();
EXPECT_FALSE(packets.empty());
EXPECT_TRUE(packetDecoder->decode(decodeMsg, packets.back()));

View File

@ -0,0 +1,96 @@
#include "icsneo/communication/ringbuffer.h"
#include "gtest/gtest.h"
using namespace icsneo;
class RingBufferTest : public ::testing::Test {
protected:
static constexpr const size_t bufferSize = 32u;
static constexpr const size_t testDataSize = 32u;
RingBuffer ringBuffer = RingBuffer(bufferSize);
void SetUp() override {
ringBuffer.clear();
}
const std::vector<uint8_t> testBytes = {
0, 1, 2, 3, 4, 5, 6, 7,
8, 9, 10, 11, 12, 13, 14, 15,
16, 17, 18, 19, 20, 21, 22, 23,
24, 25, 26, 27, 28, 29, 30, 31
};
};
TEST_F(RingBufferTest, ConstructorTest) {
// Standard case, integral power of 2
ASSERT_EQ(RingBuffer(16).capacity(), 16u);
// Edge cases
// SIZE_MAX - commented out because this will throw on all architectures due to the allocation that happens here.
//ASSERT_EQ(RingBuffer(SIZE_MAX).capacity(), RingBuffer::MaxSize);
// Zero
ASSERT_EQ(RingBuffer(0).capacity(), 1u);
// arbitrary number that is not a power of 2
ASSERT_EQ(RingBuffer(60).capacity(), 64u);
}
TEST_F(RingBufferTest, InitAndCapacityTest) {
constexpr auto size = 8u;
RingBuffer rb(size);
ASSERT_EQ(rb.size(), 0u);
ASSERT_EQ(rb.capacity(), size);
}
TEST_F(RingBufferTest, WriteAndClearTest) {
ASSERT_TRUE(ringBuffer.write(testBytes));
ASSERT_EQ(ringBuffer.size(), testBytes.size());
ringBuffer.clear();
ASSERT_EQ(ringBuffer.size(), 0u);
}
TEST_F(RingBufferTest, SimpleWriteReadTest) {
std::vector<uint8_t> readBack(testDataSize);
ASSERT_TRUE(ringBuffer.write(testBytes));
ASSERT_EQ(ringBuffer.size(), testDataSize);
ASSERT_TRUE(ringBuffer.read(readBack.data(), 0, testDataSize));
ASSERT_EQ(readBack, testBytes);
}
TEST_F(RingBufferTest, OverlappedReadWriteTest) {
std::vector<uint8_t> readBack(testDataSize);
std::vector<uint8_t> ignoredData(bufferSize - 3);
ASSERT_TRUE(ringBuffer.write(ignoredData));
ringBuffer.pop(ignoredData.size());
ASSERT_EQ(ringBuffer.size(), 0u);
ASSERT_TRUE(ringBuffer.write(testBytes));
ASSERT_TRUE(ringBuffer.read(readBack.data(), 0, testDataSize));
ASSERT_EQ(readBack, testBytes);
}
TEST_F(RingBufferTest, WritePastReadCursorTest) {
std::vector<uint8_t> readBack(ringBuffer.capacity());
// Fill
ASSERT_TRUE(ringBuffer.write(testBytes));
// Read partial
auto readSize = ringBuffer.size() - 4;
ASSERT_TRUE(ringBuffer.read(readBack.data(), 0, readSize));
// Now writeCursor (masked) is 0, readCursor (masked) is capacity() - 4, writing past the read cursor should fail.
ASSERT_FALSE(ringBuffer.write(testBytes.data(), readSize + 1));
}
TEST_F(RingBufferTest, WriteWhenFullTest) {
std::vector<uint8_t> fillData(ringBuffer.capacity());
ASSERT_TRUE(ringBuffer.write(fillData));
ASSERT_FALSE(ringBuffer.write(fillData.data(), 1));
}
TEST_F(RingBufferTest, ReadPastEndTest) {
uint8_t dummy = 0;
// Single byte when empty
ASSERT_FALSE(ringBuffer.read(&dummy, 0, 1));
// Put in a byte
ASSERT_TRUE(ringBuffer.write(&dummy, 1));
// Single byte from offset when filled only to offset
ASSERT_FALSE(ringBuffer.read(&dummy, ringBuffer.size(), 1));
// Single byte from offset past size
ASSERT_FALSE(ringBuffer.read(&dummy, ringBuffer.size()+1, 1));
}