Fix race conditions with Communication::waitForMessageSync

pull/25/head
Paul Hollinsky 2020-02-28 20:00:08 -05:00
parent 9fcba2eb13
commit 1cd817a16b
5 changed files with 60 additions and 30 deletions

View File

@ -67,8 +67,9 @@ bool Communication::sendCommand(Command cmd, std::vector<uint8_t> arguments) {
}
bool Communication::getSettingsSync(std::vector<uint8_t>& data, std::chrono::milliseconds timeout) {
sendCommand(Command::ReadSettings, { 0, 0, 0, 1 /* Get Global Settings */, 0, 1 /* Subversion 1 */ });
std::shared_ptr<Message> msg = waitForMessageSync(MessageFilter(Network::NetID::ReadSettings), timeout);
std::shared_ptr<Message> msg = waitForMessageSync([this]() {
return sendCommand(Command::ReadSettings, { 0, 0, 0, 1 /* Get Global Settings */, 0, 1 /* Subversion 1 */ });
}, MessageFilter(Network::NetID::ReadSettings), timeout);
if(!msg)
return false;
@ -89,7 +90,9 @@ bool Communication::getSettingsSync(std::vector<uint8_t>& data, std::chrono::mil
std::shared_ptr<SerialNumberMessage> Communication::getSerialNumberSync(std::chrono::milliseconds timeout) {
sendCommand(Command::RequestSerialNumber);
std::shared_ptr<Message> msg = waitForMessageSync(std::make_shared<Main51MessageFilter>(Command::RequestSerialNumber), timeout);
std::shared_ptr<Message> msg = waitForMessageSync([this]() {
return sendCommand(Command::RequestSerialNumber);
}, Main51MessageFilter(Command::RequestSerialNumber), timeout);
if(!msg) // Did not receive a message
return std::shared_ptr<SerialNumberMessage>();
@ -117,25 +120,32 @@ bool Communication::removeMessageCallback(int id) {
}
}
std::shared_ptr<Message> Communication::waitForMessageSync(std::shared_ptr<MessageFilter> f, std::chrono::milliseconds timeout) {
std::shared_ptr<Message> Communication::waitForMessageSync(std::function<bool(void)> onceWaitingDo, MessageFilter f, std::chrono::milliseconds timeout) {
std::mutex m;
std::condition_variable cv;
std::shared_ptr<Message> returnedMessage;
std::unique_lock<std::mutex> lk(m); // Don't let the callback fire until we're waiting for it
int cb = addMessageCallback(MessageCallback([&m, &returnedMessage, &cv](std::shared_ptr<Message> message) {
{
std::lock_guard<std::mutex> lk(m);
returnedMessage = message;
}
cv.notify_one();
cv.notify_all();
}, f));
// We have now added the callback, wait for it to return from the other thread
std::unique_lock<std::mutex> lk(m);
cv.wait_for(lk, timeout, [&returnedMessage] { return !!returnedMessage; }); // `!!shared_ptr` checks if the ptr has a value
lk.unlock();
// We have now added the callback, do whatever the caller wanted to do
bool fail = !onceWaitingDo();
if(!fail)
cv.wait_for(lk, timeout, [&returnedMessage] { return !!returnedMessage; }); // `!!shared_ptr` checks if the ptr has a value
lk.unlock(); // Ensure callbacks can complete even if we didn't wait for them
// We don't actually check that we got a message, because either way we want to remove the callback (since it should only happen once)
removeMessageCallback(cb);
// We are now guaranteed that no more callbacks will happen
if(fail) // The caller's function failed, so don't return a message
returnedMessage.reset();
// Then we either will return the message we got or we will return the empty shared_ptr, caller responsible for checking
return returnedMessage;

View File

@ -236,11 +236,17 @@ bool Device::goOnline() {
while((std::chrono::system_clock::now() - startTime) < std::chrono::seconds(5)) {
if(latestResetStatus && latestResetStatus->comEnabled)
break;
if(!com->sendCommand(Command::RequestStatusUpdate))
return false;
com->waitForMessageSync(filter, std::chrono::milliseconds(100));
bool failOut = false;
com->waitForMessageSync([this, &failOut]() {
if(!com->sendCommand(Command::RequestStatusUpdate)) {
failOut = true;
return false;
}
return true;
}, filter, std::chrono::milliseconds(100));
if(failOut)
return false;
}
online = true;

View File

@ -210,8 +210,10 @@ bool IDeviceSettings::apply(bool temporary) {
bytestream[6] = (uint8_t)(gs_checksum >> 8);
memcpy(bytestream.data() + 7, getMutableRawStructurePointer(), settings.size());
com->sendCommand(Command::SetSettings, bytestream);
std::shared_ptr<Message> msg = com->waitForMessageSync(std::make_shared<Main51MessageFilter>(Command::SetSettings), std::chrono::milliseconds(1000));
std::shared_ptr<Message> msg = com->waitForMessageSync([this, &bytestream]() {
return com->sendCommand(Command::SetSettings, bytestream);
}, Main51MessageFilter(Command::SetSettings), std::chrono::milliseconds(1000));
if(!msg || msg->data[0] != 1) { // We did not receive a response
// Attempt to get the settings from the device so we're up to date if possible
@ -230,8 +232,9 @@ bool IDeviceSettings::apply(bool temporary) {
bytestream[6] = (uint8_t)(gs_checksum >> 8);
memcpy(bytestream.data() + 7, getMutableRawStructurePointer(), settings.size());
com->sendCommand(Command::SetSettings, bytestream);
msg = com->waitForMessageSync(std::make_shared<Main51MessageFilter>(Command::SetSettings), std::chrono::milliseconds(1000));
msg = com->waitForMessageSync([this, &bytestream]() {
return com->sendCommand(Command::SetSettings, bytestream);
}, Main51MessageFilter(Command::SetSettings), std::chrono::milliseconds(1000));
if(!msg || msg->data[0] != 1) {
// Attempt to get the settings from the device so we're up to date if possible
if(refresh()) {
@ -242,8 +245,9 @@ bool IDeviceSettings::apply(bool temporary) {
}
if(!temporary) {
com->sendCommand(Command::SaveSettings);
msg = com->waitForMessageSync(std::make_shared<Main51MessageFilter>(Command::SaveSettings), std::chrono::milliseconds(5000));
msg = com->waitForMessageSync([this]() {
return com->sendCommand(Command::SaveSettings);
}, Main51MessageFilter(Command::SaveSettings), std::chrono::milliseconds(5000));
}
refresh(); // Refresh our buffer with what the device has, whether we were successful or not
@ -266,8 +270,9 @@ bool IDeviceSettings::applyDefaults(bool temporary) {
return false;
}
com->sendCommand(Command::SetDefaultSettings);
std::shared_ptr<Message> msg = com->waitForMessageSync(std::make_shared<Main51MessageFilter>(Command::SetDefaultSettings), std::chrono::milliseconds(1000));
std::shared_ptr<Message> msg = com->waitForMessageSync([this]() {
return com->sendCommand(Command::SetDefaultSettings);
}, Main51MessageFilter(Command::SetDefaultSettings), std::chrono::milliseconds(1000));
if(!msg || msg->data[0] != 1) {
// Attempt to get the settings from the device so we're up to date if possible
if(refresh()) {
@ -295,8 +300,9 @@ bool IDeviceSettings::applyDefaults(bool temporary) {
bytestream[6] = (uint8_t)(gs_checksum >> 8);
memcpy(bytestream.data() + 7, getMutableRawStructurePointer(), settings.size());
com->sendCommand(Command::SetSettings, bytestream);
msg = com->waitForMessageSync(std::make_shared<Main51MessageFilter>(Command::SetSettings), std::chrono::milliseconds(1000));
msg = com->waitForMessageSync([this, &bytestream]() {
return com->sendCommand(Command::SetSettings, bytestream);
}, Main51MessageFilter(Command::SetSettings), std::chrono::milliseconds(1000));
if(!msg || msg->data[0] != 1) {
// Attempt to get the settings from the device so we're up to date if possible
if(refresh()) {
@ -307,8 +313,9 @@ bool IDeviceSettings::applyDefaults(bool temporary) {
}
if(!temporary) {
com->sendCommand(Command::SaveSettings);
msg = com->waitForMessageSync(std::make_shared<Main51MessageFilter>(Command::SaveSettings), std::chrono::milliseconds(5000));
msg = com->waitForMessageSync([this]() {
return com->sendCommand(Command::SaveSettings);
}, Main51MessageFilter(Command::SaveSettings), std::chrono::milliseconds(5000));
}
refresh(); // Refresh our buffer with what the device has, whether we were successful or not

View File

@ -47,10 +47,17 @@ public:
int addMessageCallback(const MessageCallback& cb);
bool removeMessageCallback(int id);
std::shared_ptr<Message> waitForMessageSync(MessageFilter f = MessageFilter(), std::chrono::milliseconds timeout = std::chrono::milliseconds(50)) {
return waitForMessageSync(std::make_shared<MessageFilter>(f), timeout);
std::shared_ptr<Message> waitForMessageSync(
MessageFilter f = MessageFilter(),
std::chrono::milliseconds timeout = std::chrono::milliseconds(50)) {
return waitForMessageSync([](){ return true; }, f, timeout);
}
std::shared_ptr<Message> waitForMessageSync(std::shared_ptr<MessageFilter> f, std::chrono::milliseconds timeout = std::chrono::milliseconds(50));
// onceWaitingDo is a way to avoid race conditions.
// Return false to bail early, in case your initial command failed.
std::shared_ptr<Message> waitForMessageSync(
std::function<bool(void)> onceWaitingDo,
MessageFilter f = MessageFilter(),
std::chrono::milliseconds timeout = std::chrono::milliseconds(50));
std::shared_ptr<Packetizer> packetizer; // Ownership is shared with the encoder
std::unique_ptr<Encoder> encoder;

View File

@ -11,11 +11,11 @@ namespace icsneo {
class Main51MessageCallback : public MessageCallback {
public:
Main51MessageCallback(fn_messageCallback cb, std::shared_ptr<Main51MessageFilter> f) : MessageCallback(cb, f) {}
Main51MessageCallback(fn_messageCallback cb, Main51MessageFilter f = Main51MessageFilter()) : MessageCallback(cb, std::make_shared<Main51MessageFilter>(f)) {}
Main51MessageCallback(fn_messageCallback cb, Main51MessageFilter f = Main51MessageFilter()) : MessageCallback(cb, Main51MessageFilter(f)) {}
// Allow the filter to be placed first if the user wants (maybe in the case of a lambda)
Main51MessageCallback(std::shared_ptr<Main51MessageFilter> f, fn_messageCallback cb) : MessageCallback(cb, f) {}
Main51MessageCallback(Main51MessageFilter f, fn_messageCallback cb) : MessageCallback(cb, std::make_shared<Main51MessageFilter>(f)) {}
Main51MessageCallback(Main51MessageFilter f, fn_messageCallback cb) : MessageCallback(cb, Main51MessageFilter(f)) {}
};
};