#include "icsneo/communication/communication.h" #include #include #include #include #include #include #include #include "icsneo/communication/command.h" #include "icsneo/communication/decoder.h" #include "icsneo/communication/packetizer.h" #include "icsneo/communication/message/serialnumbermessage.h" #include "icsneo/communication/message/filter/main51messagefilter.h" #include "icsneo/communication/message/readsettingsmessage.h" using namespace icsneo; int Communication::messageCallbackIDCounter = 1; bool Communication::open() { if(isOpen()) { report(APIEvent::Type::DeviceCurrentlyOpen, APIEvent::Severity::Error); return false; } spawnThreads(); return impl->open(); } void Communication::spawnThreads() { readTaskThread = std::thread(&Communication::readTask, this); } void Communication::joinThreads() { closing = true; if(readTaskThread.joinable()) readTaskThread.join(); closing = false; } bool Communication::close() { if(!isOpen()) { report(APIEvent::Type::DeviceCurrentlyClosed, APIEvent::Severity::Error); return false; } joinThreads(); return impl->close(); } bool Communication::isOpen() { return impl->isOpen(); } bool Communication::sendPacket(std::vector& bytes) { // This is here so that other communication types (like multichannel) can override it return rawWrite(bytes); } bool Communication::sendCommand(Command cmd, std::vector arguments) { std::vector packet; if(!encoder->encode(packet, cmd, arguments)) return false; return sendPacket(packet); } bool Communication::getSettingsSync(std::vector& data, std::chrono::milliseconds timeout) { sendCommand(Command::ReadSettings, { 0, 0, 0, 1 /* Get Global Settings */, 0, 1 /* Subversion 1 */ }); std::shared_ptr msg = waitForMessageSync(MessageFilter(Network::NetID::ReadSettings), timeout); if(!msg) return false; std::shared_ptr gsmsg = std::dynamic_pointer_cast(msg); if(!gsmsg) { report(APIEvent::Type::Unknown, APIEvent::Severity::Error); return false; } if(gsmsg->response != ReadSettingsMessage::Response::OK) { report(APIEvent::Type::Unknown, APIEvent::Severity::Error); return false; } data = std::move(msg->data); return true; } std::shared_ptr Communication::getSerialNumberSync(std::chrono::milliseconds timeout) { sendCommand(Command::RequestSerialNumber); std::shared_ptr msg = waitForMessageSync(std::make_shared(Command::RequestSerialNumber), timeout); if(!msg) // Did not receive a message return std::shared_ptr(); auto m51 = std::dynamic_pointer_cast(msg); if(!m51) // Could not upcast for some reason return std::shared_ptr(); return std::dynamic_pointer_cast(m51); } int Communication::addMessageCallback(const MessageCallback& cb) { std::lock_guard lk(messageCallbacksLock); messageCallbacks.insert(std::make_pair(messageCallbackIDCounter, cb)); return messageCallbackIDCounter++; } bool Communication::removeMessageCallback(int id) { std::lock_guard lk(messageCallbacksLock); try { messageCallbacks.erase(id); return true; } catch(...) { report(APIEvent::Type::Unknown, APIEvent::Severity::Error); return false; } } std::shared_ptr Communication::waitForMessageSync(std::shared_ptr f, std::chrono::milliseconds timeout) { std::mutex m; std::condition_variable cv; std::shared_ptr returnedMessage; int cb = addMessageCallback(MessageCallback([&m, &returnedMessage, &cv](std::shared_ptr message) { { std::lock_guard lk(m); returnedMessage = message; } cv.notify_one(); }, f)); // We have now added the callback, wait for it to return from the other thread std::unique_lock lk(m); cv.wait_for(lk, timeout, [&returnedMessage]{ return !!returnedMessage; }); // `!!shared_ptr` checks if the ptr has a value // We don't actually check that we got a message, because either way we want to remove the callback (since it should only happen once) removeMessageCallback(cb); // Then we either will return the message we got or we will return the empty shared_ptr, caller responsible for checking return returnedMessage; } void Communication::readTask() { std::vector readBytes; EventManager::GetInstance().downgradeErrorsOnCurrentThread(); while(!closing) { readBytes.clear(); if(impl->readWait(readBytes)) { if(packetizer->input(readBytes)) { for(auto& packet : packetizer->output()) { std::shared_ptr msg; if(!decoder->decode(msg, packet)) continue; std::lock_guard lk(messageCallbacksLock); // We want callbacks to be able to access errors EventManager::GetInstance().cancelErrorDowngradingOnCurrentThread(); for(auto& cb : messageCallbacks) { if(!closing) { // We might have closed while reading or processing cb.second.callIfMatch(msg); } } EventManager::GetInstance().downgradeErrorsOnCurrentThread(); } } } } }