From f8355df770926c749eec301ec533c19a88e78d6c Mon Sep 17 00:00:00 2001 From: Paul Hollinsky Date: Fri, 23 Apr 2021 22:33:32 -0400 Subject: [PATCH] Communication: Block destruction while inside redirectionFn --- communication/communication.cpp | 54 ++++++++++++++++---- include/icsneo/communication/communication.h | 6 ++- 2 files changed, 47 insertions(+), 13 deletions(-) diff --git a/communication/communication.cpp b/communication/communication.cpp index da98a00..53c4055 100644 --- a/communication/communication.cpp +++ b/communication/communication.cpp @@ -17,6 +17,12 @@ using namespace icsneo; int Communication::messageCallbackIDCounter = 1; +Communication::~Communication() { + if(redirectingRead) + clearRedirectRead(); + close(); +} + bool Communication::open() { if(isOpen()) { report(APIEvent::Type::DeviceCurrentlyOpen, APIEvent::Severity::Error); @@ -80,6 +86,15 @@ bool Communication::redirectRead(std::function&&)> red return true; } +void Communication::clearRedirectRead() { + if(!redirectingRead) + return; + // The mutex is required to clear the redirection, but not to set it + std::lock_guard lk(redirectingReadMutex); + redirectingRead = false; + redirectionFn = std::function&&)>(); +} + bool Communication::getSettingsSync(std::vector& data, std::chrono::milliseconds timeout) { std::shared_ptr msg = waitForMessageSync([this]() { return sendCommand(Command::ReadSettings, { 0, 0, 0, 1 /* Get Global Settings */, 0, 1 /* Subversion 1 */ }); @@ -189,18 +204,35 @@ void Communication::readTask() { while(!closing) { readBytes.clear(); if(driver->readWait(readBytes)) { - if(redirectingRead) { - redirectionFn(std::move(readBytes)); - } else { - if(packetizer->input(readBytes)) { - for(const auto& packet : packetizer->output()) { - std::shared_ptr msg; - if(!decoder->decode(msg, packet)) - continue; + handleInput(*packetizer, readBytes); + } + } +} - dispatchMessage(msg); - } - } +void Communication::handleInput(Packetizer& p, std::vector& readBytes) { + if(redirectingRead) { + // redirectingRead is an atomic so it can be set without acquiring a mutex + // However, we do not clear it without the mutex. The idea is that if another + // thread calls clearRedirectRead(), it will block until the redirectionFn + // finishes, and after that the redirectionFn will not be called again. + std::unique_lock lk(redirectingReadMutex); + // So after we acquire the mutex, we need to check the atomic again, and + // if it has become cleared, we *can not* run the redirectionFn. + if(redirectingRead) { + redirectionFn(std::move(readBytes)); + } else { + // The redirectionFn got cleared while we were acquiring the lock + lk.unlock(); // We don't need the lock anymore + handleInput(p, readBytes); // and we might as well process this input ourselves + } + } else { + if(p.input(readBytes)) { + for(const auto& packet : p.output()) { + std::shared_ptr msg; + if(!decoder->decode(msg, packet)) + continue; + + dispatchMessage(msg); } } } diff --git a/include/icsneo/communication/communication.h b/include/icsneo/communication/communication.h index 8c4c59a..38c9006 100644 --- a/include/icsneo/communication/communication.h +++ b/include/icsneo/communication/communication.h @@ -32,7 +32,7 @@ public: std::unique_ptr&& md) : makeConfiguredPacketizer(makeConfiguredPacketizer), encoder(std::move(e)), decoder(std::move(md)), report(report), driver(std::move(driver)) { packetizer = makeConfiguredPacketizer(); } - virtual ~Communication() { close(); } + virtual ~Communication(); bool open(); bool close(); @@ -43,7 +43,7 @@ public: bool rawWrite(const std::vector& bytes) { return driver->write(bytes); } virtual bool sendPacket(std::vector& bytes); bool redirectRead(std::function&&)> redirectTo); - void clearRedirectRead() { redirectingRead = false; } + void clearRedirectRead(); void setWriteBlocks(bool blocks) { driver->writeBlocks = blocks; } @@ -80,8 +80,10 @@ protected: std::atomic closing{false}; std::atomic redirectingRead{false}; std::function&&)> redirectionFn; + std::mutex redirectingReadMutex; // Don't allow read to be disabled while in the redirectionFn void dispatchMessage(const std::shared_ptr& msg); + void handleInput(Packetizer& p, std::vector& readBytes); private: std::thread readTaskThread;