Communication: Fix MultiChannelCommunication race

pull/76/merge
Max Brombach 2025-09-09 19:52:54 +00:00 committed by Kyle Schwarz
parent f53bc2e920
commit fbdab1e998
2 changed files with 22 additions and 13 deletions

View File

@ -9,7 +9,7 @@ using namespace icsneo;
MultiChannelCommunication::MultiChannelCommunication(device_eventhandler_t err, std::unique_ptr<Driver> com,
std::function<std::unique_ptr<Packetizer>()> makeConfiguredPacketizer, std::unique_ptr<Encoder> e,
std::unique_ptr<Decoder> md, size_t vnetCount) :
Communication(err, std::move(com), makeConfiguredPacketizer, std::move(e), std::move(md)), numVnets(vnetCount) {
Communication(err, std::move(com), makeConfiguredPacketizer, std::move(e), std::move(md)), numVnets(vnetCount), packetRB(2048) {
vnetThreads.resize(numVnets);
vnetQueues.resize(numVnets);
}
@ -148,9 +148,13 @@ void MultiChannelCommunication::hidReadTask() {
break;
}
if(!currentQueue->enqueue(std::move(payloadBytes)) && gotPacket)
EventManager::GetInstance().add(APIEvent(APIEvent::Type::FailedToRead, APIEvent::Severity::Error));
payloadBytes.clear();
{
std::unique_lock lk(ringBufMutex);
if(!packetRB.write(std::move(payloadBytes)) && gotPacket)
EventManager::GetInstance().add(APIEvent(APIEvent::Type::FailedToRead, APIEvent::Severity::Error));
payloadBytes.clear();
}
ringBufCV.notify_all();
gotPacket = true;
state = PreprocessState::SearchForCommand;
break;
@ -160,7 +164,6 @@ void MultiChannelCommunication::hidReadTask() {
}
void MultiChannelCommunication::vnetReadTask(size_t vnetIndex) {
moodycamel::BlockingReaderWriterQueue< std::vector<uint8_t> >& queue = vnetQueues[vnetIndex];
std::vector<uint8_t> payloadBytes;
std::unique_ptr<Packetizer> packetizerLifetime;
Packetizer* vnetPacketizer;
@ -174,14 +177,16 @@ void MultiChannelCommunication::vnetReadTask(size_t vnetIndex) {
EventManager::GetInstance().downgradeErrorsOnCurrentThread();
while(!closing) {
if(queue.wait_dequeue_timed(payloadBytes, std::chrono::milliseconds(250))) {
if(closing)
break;
std::unique_lock lk(ringBufMutex);
ringBufCV.wait(lk);
if(vnetPacketizer->input(packetRB)) {
for(const auto& packet : vnetPacketizer->output()) {
std::shared_ptr<Message> msg;
if(!decoder->decode(msg, packet))
continue;
auto& ringBuffer = driver->getReadBuffer();
ringBuffer.write(payloadBytes);
handleInput(*vnetPacketizer);
}
dispatchMessage(msg);
}
}
}
}

View File

@ -113,6 +113,10 @@ private:
std::vector< moodycamel::BlockingReaderWriterQueue< std::vector<uint8_t> > > vnetQueues;
void hidReadTask();
void vnetReadTask(size_t vnetIndex);
RingBuffer packetRB;
std::mutex ringBufMutex;
std::condition_variable ringBufCV;
};
}