Communication: Block destruction while inside redirectionFn
parent
218648ae5a
commit
f8355df770
|
|
@ -17,6 +17,12 @@ using namespace icsneo;
|
|||
|
||||
int Communication::messageCallbackIDCounter = 1;
|
||||
|
||||
Communication::~Communication() {
|
||||
if(redirectingRead)
|
||||
clearRedirectRead();
|
||||
close();
|
||||
}
|
||||
|
||||
bool Communication::open() {
|
||||
if(isOpen()) {
|
||||
report(APIEvent::Type::DeviceCurrentlyOpen, APIEvent::Severity::Error);
|
||||
|
|
@ -80,6 +86,15 @@ bool Communication::redirectRead(std::function<void(std::vector<uint8_t>&&)> red
|
|||
return true;
|
||||
}
|
||||
|
||||
void Communication::clearRedirectRead() {
|
||||
if(!redirectingRead)
|
||||
return;
|
||||
// The mutex is required to clear the redirection, but not to set it
|
||||
std::lock_guard<std::mutex> lk(redirectingReadMutex);
|
||||
redirectingRead = false;
|
||||
redirectionFn = std::function<void(std::vector<uint8_t>&&)>();
|
||||
}
|
||||
|
||||
bool Communication::getSettingsSync(std::vector<uint8_t>& data, std::chrono::milliseconds timeout) {
|
||||
std::shared_ptr<Message> msg = waitForMessageSync([this]() {
|
||||
return sendCommand(Command::ReadSettings, { 0, 0, 0, 1 /* Get Global Settings */, 0, 1 /* Subversion 1 */ });
|
||||
|
|
@ -189,11 +204,30 @@ void Communication::readTask() {
|
|||
while(!closing) {
|
||||
readBytes.clear();
|
||||
if(driver->readWait(readBytes)) {
|
||||
handleInput(*packetizer, readBytes);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void Communication::handleInput(Packetizer& p, std::vector<uint8_t>& readBytes) {
|
||||
if(redirectingRead) {
|
||||
// redirectingRead is an atomic so it can be set without acquiring a mutex
|
||||
// However, we do not clear it without the mutex. The idea is that if another
|
||||
// thread calls clearRedirectRead(), it will block until the redirectionFn
|
||||
// finishes, and after that the redirectionFn will not be called again.
|
||||
std::unique_lock<std::mutex> lk(redirectingReadMutex);
|
||||
// So after we acquire the mutex, we need to check the atomic again, and
|
||||
// if it has become cleared, we *can not* run the redirectionFn.
|
||||
if(redirectingRead) {
|
||||
redirectionFn(std::move(readBytes));
|
||||
} else {
|
||||
if(packetizer->input(readBytes)) {
|
||||
for(const auto& packet : packetizer->output()) {
|
||||
// The redirectionFn got cleared while we were acquiring the lock
|
||||
lk.unlock(); // We don't need the lock anymore
|
||||
handleInput(p, readBytes); // and we might as well process this input ourselves
|
||||
}
|
||||
} else {
|
||||
if(p.input(readBytes)) {
|
||||
for(const auto& packet : p.output()) {
|
||||
std::shared_ptr<Message> msg;
|
||||
if(!decoder->decode(msg, packet))
|
||||
continue;
|
||||
|
|
@ -202,6 +236,4 @@ void Communication::readTask() {
|
|||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -32,7 +32,7 @@ public:
|
|||
std::unique_ptr<Decoder>&& md) : makeConfiguredPacketizer(makeConfiguredPacketizer), encoder(std::move(e)), decoder(std::move(md)), report(report), driver(std::move(driver)) {
|
||||
packetizer = makeConfiguredPacketizer();
|
||||
}
|
||||
virtual ~Communication() { close(); }
|
||||
virtual ~Communication();
|
||||
|
||||
bool open();
|
||||
bool close();
|
||||
|
|
@ -43,7 +43,7 @@ public:
|
|||
bool rawWrite(const std::vector<uint8_t>& bytes) { return driver->write(bytes); }
|
||||
virtual bool sendPacket(std::vector<uint8_t>& bytes);
|
||||
bool redirectRead(std::function<void(std::vector<uint8_t>&&)> redirectTo);
|
||||
void clearRedirectRead() { redirectingRead = false; }
|
||||
void clearRedirectRead();
|
||||
|
||||
void setWriteBlocks(bool blocks) { driver->writeBlocks = blocks; }
|
||||
|
||||
|
|
@ -80,8 +80,10 @@ protected:
|
|||
std::atomic<bool> closing{false};
|
||||
std::atomic<bool> redirectingRead{false};
|
||||
std::function<void(std::vector<uint8_t>&&)> redirectionFn;
|
||||
std::mutex redirectingReadMutex; // Don't allow read to be disabled while in the redirectionFn
|
||||
|
||||
void dispatchMessage(const std::shared_ptr<Message>& msg);
|
||||
void handleInput(Packetizer& p, std::vector<uint8_t>& readBytes);
|
||||
|
||||
private:
|
||||
std::thread readTaskThread;
|
||||
|
|
|
|||
Loading…
Reference in New Issue