From fbdab1e9982f795a431708854a8bed3f662c25d3 Mon Sep 17 00:00:00 2001 From: Max Brombach Date: Tue, 9 Sep 2025 19:52:54 +0000 Subject: [PATCH] Communication: Fix MultiChannelCommunication race --- communication/multichannelcommunication.cpp | 31 +++++++++++-------- .../communication/multichannelcommunication.h | 4 +++ 2 files changed, 22 insertions(+), 13 deletions(-) diff --git a/communication/multichannelcommunication.cpp b/communication/multichannelcommunication.cpp index b1050fa..ec4ef55 100644 --- a/communication/multichannelcommunication.cpp +++ b/communication/multichannelcommunication.cpp @@ -9,7 +9,7 @@ using namespace icsneo; MultiChannelCommunication::MultiChannelCommunication(device_eventhandler_t err, std::unique_ptr com, std::function()> makeConfiguredPacketizer, std::unique_ptr e, std::unique_ptr md, size_t vnetCount) : - Communication(err, std::move(com), makeConfiguredPacketizer, std::move(e), std::move(md)), numVnets(vnetCount) { + Communication(err, std::move(com), makeConfiguredPacketizer, std::move(e), std::move(md)), numVnets(vnetCount), packetRB(2048) { vnetThreads.resize(numVnets); vnetQueues.resize(numVnets); } @@ -148,9 +148,13 @@ void MultiChannelCommunication::hidReadTask() { break; } - if(!currentQueue->enqueue(std::move(payloadBytes)) && gotPacket) - EventManager::GetInstance().add(APIEvent(APIEvent::Type::FailedToRead, APIEvent::Severity::Error)); - payloadBytes.clear(); + { + std::unique_lock lk(ringBufMutex); + if(!packetRB.write(std::move(payloadBytes)) && gotPacket) + EventManager::GetInstance().add(APIEvent(APIEvent::Type::FailedToRead, APIEvent::Severity::Error)); + payloadBytes.clear(); + } + ringBufCV.notify_all(); gotPacket = true; state = PreprocessState::SearchForCommand; break; @@ -160,7 +164,6 @@ void MultiChannelCommunication::hidReadTask() { } void MultiChannelCommunication::vnetReadTask(size_t vnetIndex) { - moodycamel::BlockingReaderWriterQueue< std::vector >& queue = vnetQueues[vnetIndex]; std::vector payloadBytes; std::unique_ptr packetizerLifetime; Packetizer* vnetPacketizer; @@ -174,14 +177,16 @@ void MultiChannelCommunication::vnetReadTask(size_t vnetIndex) { EventManager::GetInstance().downgradeErrorsOnCurrentThread(); while(!closing) { - if(queue.wait_dequeue_timed(payloadBytes, std::chrono::milliseconds(250))) { - if(closing) - break; + std::unique_lock lk(ringBufMutex); + ringBufCV.wait(lk); + if(vnetPacketizer->input(packetRB)) { + for(const auto& packet : vnetPacketizer->output()) { + std::shared_ptr msg; + if(!decoder->decode(msg, packet)) + continue; - auto& ringBuffer = driver->getReadBuffer(); - ringBuffer.write(payloadBytes); - - handleInput(*vnetPacketizer); - } + dispatchMessage(msg); + } + } } } \ No newline at end of file diff --git a/include/icsneo/communication/multichannelcommunication.h b/include/icsneo/communication/multichannelcommunication.h index ffdc49f..51b6a08 100644 --- a/include/icsneo/communication/multichannelcommunication.h +++ b/include/icsneo/communication/multichannelcommunication.h @@ -113,6 +113,10 @@ private: std::vector< moodycamel::BlockingReaderWriterQueue< std::vector > > vnetQueues; void hidReadTask(); void vnetReadTask(size_t vnetIndex); + + RingBuffer packetRB; + std::mutex ringBufMutex; + std::condition_variable ringBufCV; }; }