Communication: Atomic sync messages

If waitForMessageSync() is called in two threads for the same message
the callback for both will be invoked with the first send.
add-device-sharing
Kyle Schwarz 2022-10-20 17:26:21 -04:00
parent 17b3018499
commit 0101467154
2 changed files with 8 additions and 6 deletions

View File

@ -212,14 +212,15 @@ bool Communication::removeMessageCallback(int id) {
std::shared_ptr<Message> Communication::waitForMessageSync(std::function<bool(void)> onceWaitingDo,
const std::shared_ptr<MessageFilter>& f, std::chrono::milliseconds timeout) {
std::mutex m;
std::mutex cvMutex;
std::condition_variable cv;
std::shared_ptr<Message> returnedMessage;
std::unique_lock<std::mutex> lk(m); // Don't let the callback fire until we're waiting for it
int cb = addMessageCallback(std::make_shared<MessageCallback>([&m, &returnedMessage, &cv](std::shared_ptr<Message> message) {
std::unique_lock<std::mutex> fnLk(syncMessageMutex); // Only allow for one sync message at a time
std::unique_lock<std::mutex> cvLk(cvMutex); // Don't let the callback fire until we're waiting for it
int cb = addMessageCallback(std::make_shared<MessageCallback>([&cvMutex, &returnedMessage, &cv](std::shared_ptr<Message> message) {
{
std::lock_guard<std::mutex> lk(m);
std::lock_guard<std::mutex> lk(cvMutex);
returnedMessage = message;
}
cv.notify_all();
@ -228,8 +229,8 @@ std::shared_ptr<Message> Communication::waitForMessageSync(std::function<bool(vo
// We have now added the callback, do whatever the caller wanted to do
bool fail = !onceWaitingDo();
if(!fail)
cv.wait_for(lk, timeout, [&returnedMessage] { return !!returnedMessage; }); // `!!shared_ptr` checks if the ptr has a value
lk.unlock(); // Ensure callbacks can complete even if we didn't wait for them
cv.wait_for(cvLk, timeout, [&returnedMessage] { return !!returnedMessage; }); // `!!shared_ptr` checks if the ptr has a value
cvLk.unlock(); // Ensure callbacks can complete even if we didn't wait for them
// We don't actually check that we got a message, because either way we want to remove the callback (since it should only happen once)
removeMessageCallback(cb);

View File

@ -88,6 +88,7 @@ protected:
std::atomic<bool> redirectingRead{false};
std::function<void(std::vector<uint8_t>&&)> redirectionFn;
std::mutex redirectingReadMutex; // Don't allow read to be disabled while in the redirectionFn
std::mutex syncMessageMutex;
void dispatchMessage(const std::shared_ptr<Message>& msg);
void handleInput(Packetizer& p, std::vector<uint8_t>& readBytes);