Device: Fix heartbeat thread join location

pull/73/head
Yasser Yassine 2025-02-19 05:15:23 +00:00 committed by Kyle Schwarz
parent fb5c4babce
commit d4995fa2a9
2 changed files with 25 additions and 24 deletions

View File

@ -37,29 +37,28 @@ bool Communication::open() {
} }
void Communication::spawnThreads() { void Communication::spawnThreads() {
closing = false;
readTaskThread = std::thread(&Communication::readTask, this); readTaskThread = std::thread(&Communication::readTask, this);
} }
void Communication::joinThreads() { void Communication::joinThreads() {
closing = true;
if(pauseReadTask) { if(pauseReadTask) {
resumeReads(); resumeReads();
} }
closing = true;
if(readTaskThread.joinable()) if(readTaskThread.joinable())
readTaskThread.join(); readTaskThread.join();
closing = false;
} }
bool Communication::close() { bool Communication::close() {
joinThreads();
if(!isOpen() && !isDisconnected()) { if(!isOpen() && !isDisconnected()) {
report(APIEvent::Type::DeviceCurrentlyClosed, APIEvent::Severity::Error); report(APIEvent::Type::DeviceCurrentlyClosed, APIEvent::Severity::Error);
return false; return false;
} }
joinThreads();
return driver->close(); return driver->close();
} }

View File

@ -88,6 +88,10 @@ Device::~Device() {
disableMessagePolling(); disableMessagePolling();
if(isOpen()) if(isOpen())
close(); close();
if(heartbeatThread.joinable()) {
stopHeartbeatThread = true;
heartbeatThread.join();
}
} }
uint16_t Device::getTimestampResolution() const { uint16_t Device::getTimestampResolution() const {
@ -245,6 +249,12 @@ bool Device::open(OpenFlags flags, OpenStatusHandler handler) {
handleInternalMessage(message); handleInternalMessage(message);
})); }));
// Clear the previous heartbeat thread, in case open() was called on this instance more than once
if(heartbeatThread.joinable())
heartbeatThread.join();
stopHeartbeatThread = false;
heartbeatThread = std::thread([this]() { heartbeatThread = std::thread([this]() {
EventManager::GetInstance().downgradeErrorsOnCurrentThread(); EventManager::GetInstance().downgradeErrorsOnCurrentThread();
@ -270,12 +280,12 @@ bool Device::open(OpenFlags flags, OpenStatusHandler handler) {
} }
while(!stopHeartbeatThread) { while(!stopHeartbeatThread) {
// Wait for 110ms for a possible heartbeat
std::this_thread::sleep_for(std::chrono::milliseconds(110));
std::unique_lock<std::mutex> recvLk(receivedMessageMutex); std::unique_lock<std::mutex> recvLk(receivedMessageMutex);
if(receivedMessage) { // Wait for 110ms for a possible heartbeat
if(heartbeatCV.wait_for(recvLk, std::chrono::milliseconds(110), [&]() { return receivedMessage; })) {
receivedMessage = false; receivedMessage = false;
} else { } else if(!stopHeartbeatThread) { // Add this condition here in case the thread was stopped while waiting for the last message
// Some communication, such as the bootloader and extractor interfaces, must // Some communication, such as the bootloader and extractor interfaces, must
// redirect the input stream from the device as it will no longer be in the // redirect the input stream from the device as it will no longer be in the
// packet format we expect here. As a result, status updates will not reach // packet format we expect here. As a result, status updates will not reach
@ -284,9 +294,8 @@ bool Device::open(OpenFlags flags, OpenStatusHandler handler) {
// otherwise quiet stream. This lock makes sure suppressDisconnects() will // otherwise quiet stream. This lock makes sure suppressDisconnects() will
// block until we've either gotten our status update or disconnected from // block until we've either gotten our status update or disconnected from
// the device. // the device.
std::lock_guard<std::mutex> lk(heartbeatMutex); std::unique_lock<std::mutex> lk(heartbeatMutex);
if(heartbeatSuppressed()) if(heartbeatSuppressed()) continue;
continue;
// No heartbeat received, request a status // No heartbeat received, request a status
com->sendCommand(Command::RequestStatusUpdate); com->sendCommand(Command::RequestStatusUpdate);
@ -296,8 +305,8 @@ bool Device::open(OpenFlags flags, OpenStatusHandler handler) {
receivedMessage = false; receivedMessage = false;
} else { } else {
if(!stopHeartbeatThread && !isDisconnected()) { if(!stopHeartbeatThread && !isDisconnected()) {
close();
report(APIEvent::Type::DeviceDisconnected, APIEvent::Severity::Error); report(APIEvent::Type::DeviceDisconnected, APIEvent::Severity::Error);
com->driver->close();
} }
break; break;
} }
@ -379,10 +388,6 @@ bool Device::close() {
internalHandlerCallbackID = 0; internalHandlerCallbackID = 0;
if(heartbeatThread.joinable())
heartbeatThread.join();
stopHeartbeatThread = false;
forEachExtension([](const std::shared_ptr<DeviceExtension>& ext) { ext->onDeviceClose(); return true; }); forEachExtension([](const std::shared_ptr<DeviceExtension>& ext) { ext->onDeviceClose(); return true; });
return com->close(); return com->close();
} }
@ -814,7 +819,6 @@ std::shared_ptr<HardwareInfo> Device::getHardwareInfo(std::chrono::milliseconds
report(APIEvent::Type::DeviceCurrentlyClosed, APIEvent::Severity::Error); report(APIEvent::Type::DeviceCurrentlyClosed, APIEvent::Severity::Error);
return nullptr; return nullptr;
} }
auto filter = std::make_shared<MessageFilter>(Message::Type::HardwareInfo); auto filter = std::make_shared<MessageFilter>(Message::Type::HardwareInfo);
auto response = com->waitForMessageSync([this]() { auto response = com->waitForMessageSync([this]() {
@ -1694,7 +1698,10 @@ void Device::stopScriptStatusThreadIfNecessary(std::unique_lock<std::mutex> lk)
Lifetime Device::suppressDisconnects() { Lifetime Device::suppressDisconnects() {
std::lock_guard<std::mutex> lk(heartbeatMutex); std::lock_guard<std::mutex> lk(heartbeatMutex);
heartbeatSuppressedByUser++; heartbeatSuppressedByUser++;
return Lifetime([this] { std::lock_guard<std::mutex> lk2(heartbeatMutex); heartbeatSuppressedByUser--; }); return Lifetime([this] {
std::lock_guard<std::mutex> lk2(heartbeatMutex);
heartbeatSuppressedByUser--;
});
} }
void Device::addExtension(std::shared_ptr<DeviceExtension>&& extension) { void Device::addExtension(std::shared_ptr<DeviceExtension>&& extension) {
@ -1987,11 +1994,6 @@ std::optional<size_t> Device::getGenericBinarySize(uint16_t binaryIndex) {
return std::nullopt; return std::nullopt;
} }
if(!isOnline()) {
report(APIEvent::Type::DeviceCurrentlyOffline, APIEvent::Severity::Error);
return std::nullopt;
}
std::vector<uint8_t> args = GenericBinaryStatusPacket::EncodeArguments(binaryIndex); std::vector<uint8_t> args = GenericBinaryStatusPacket::EncodeArguments(binaryIndex);
std::shared_ptr<Message> response = com->waitForMessageSync( std::shared_ptr<Message> response = com->waitForMessageSync(