libicsneo/communication/communication.cpp

334 lines
11 KiB
C++

#include "icsneo/communication/communication.h"
#include <chrono>
#include <iostream>
#include <queue>
#include <iomanip>
#include <cstring>
#include <mutex>
#include <condition_variable>
#include "icsneo/communication/command.h"
#include "icsneo/communication/decoder.h"
#include "icsneo/communication/packetizer.h"
#include "icsneo/communication/message/serialnumbermessage.h"
#include "icsneo/communication/message/filter/main51messagefilter.h"
#include "icsneo/communication/message/readsettingsmessage.h"
#include "icsneo/communication/message/versionmessage.h"
#ifdef ICSNEO_ENABLE_DEVICE_SHARING
#include "icsneo/communication/socket.h"
#endif
using namespace icsneo;
int Communication::messageCallbackIDCounter = 1;
Communication::~Communication() {
if(redirectingRead)
clearRedirectRead();
if(isOpen())
close();
}
bool Communication::open() {
if(isOpen()) {
report(APIEvent::Type::DeviceCurrentlyOpen, APIEvent::Severity::Error);
return false;
}
if(!driver->open())
return false;
spawnThreads();
return true;
}
void Communication::spawnThreads() {
readTaskThread = std::thread(&Communication::readTask, this);
}
void Communication::joinThreads() {
closing = true;
if(readTaskThread.joinable())
readTaskThread.join();
closing = false;
}
bool Communication::close() {
joinThreads();
if(!isOpen() && !isDisconnected()) {
report(APIEvent::Type::DeviceCurrentlyClosed, APIEvent::Severity::Error);
return false;
}
return driver->close();
}
bool Communication::isOpen() {
return driver->isOpen();
}
bool Communication::isDisconnected() {
return driver->isDisconnected();
}
void Communication::modifyRawCallbacks(std::function<void(std::list<Communication::RawCallback>&)>&& cb) {
std::scoped_lock lk(rawCallbacksMutex);
cb(rawCallbacks);
}
bool Communication::sendPacket(std::vector<uint8_t>& bytes) {
// This is here so that other communication types (like multichannel) can override it
return rawWrite(bytes);
}
bool Communication::sendCommand(Command cmd, std::vector<uint8_t> arguments) {
std::vector<uint8_t> packet;
if(!encoder->encode(*packetizer, packet, cmd, arguments))
return false;
return sendPacket(packet);
}
bool Communication::sendCommand(ExtendedCommand cmd, std::vector<uint8_t> arguments) {
const auto size = arguments.size();
if (size > std::numeric_limits<uint16_t>::max())
return false;
arguments.insert(arguments.begin(), {
uint8_t(uint16_t(cmd) & 0xff),
uint8_t((uint16_t(cmd) >> 8) & 0xff),
uint8_t(size & 0xff),
uint8_t((size >> 8) & 0xff)
});
return sendCommand(Command::Extended, arguments);
}
bool Communication::redirectRead(std::function<void(std::vector<uint8_t>&&)> redirectTo) {
if(redirectingRead)
return false;
redirectionFn = redirectTo;
redirectingRead = true;
return true;
}
void Communication::clearRedirectRead() {
if(!redirectingRead)
return;
// The mutex is required to clear the redirection, but not to set it
std::lock_guard<std::mutex> lk(redirectingReadMutex);
redirectingRead = false;
redirectionFn = std::function<void(std::vector<uint8_t>&&)>();
}
bool Communication::getSettingsSync(std::vector<uint8_t>& data, std::chrono::milliseconds timeout) {
static const std::shared_ptr<MessageFilter> filter = std::make_shared<MessageFilter>(Network::NetID::ReadSettings);
std::shared_ptr<Message> msg = waitForMessageSync([this]() {
return sendCommand(Command::ReadSettings, { 0, 0, 0, 1 /* Get Global Settings */, 0, 1 /* Subversion 1 */ });
}, filter, timeout);
if(!msg)
return false;
std::shared_ptr<ReadSettingsMessage> gsmsg = std::dynamic_pointer_cast<ReadSettingsMessage>(msg);
if(!gsmsg) {
report(APIEvent::Type::Unknown, APIEvent::Severity::Error);
return false;
}
if(gsmsg->response == ReadSettingsMessage::Response::OKDefaultsUsed) {
report(APIEvent::Type::SettingsDefaultsUsed, APIEvent::Severity::EventInfo);
} else if(gsmsg->response != ReadSettingsMessage::Response::OK) {
report(APIEvent::Type::SettingsReadError, APIEvent::Severity::Error);
return false;
}
data = std::move(gsmsg->data);
return true;
}
std::shared_ptr<SerialNumberMessage> Communication::getSerialNumberSync(std::chrono::milliseconds timeout) {
static const std::shared_ptr<MessageFilter> filter = std::make_shared<Main51MessageFilter>(Command::RequestSerialNumber);
std::shared_ptr<Message> msg = waitForMessageSync([this]() {
return sendCommand(Command::RequestSerialNumber);
}, filter, timeout);
if(!msg) // Did not receive a message
return std::shared_ptr<SerialNumberMessage>();
auto m51 = std::dynamic_pointer_cast<Main51Message>(msg);
if(!m51) // Could not upcast for some reason
return std::shared_ptr<SerialNumberMessage>();
return std::dynamic_pointer_cast<SerialNumberMessage>(m51);
}
std::optional< std::vector< std::optional<DeviceAppVersion> > > Communication::getVersionsSync(std::chrono::milliseconds timeout) {
static const std::shared_ptr<MessageFilter> filter = std::make_shared<MessageFilter>(Message::Type::DeviceVersion);
std::vector< std::optional<DeviceAppVersion> > ret;
std::shared_ptr<Message> msg = waitForMessageSync([this]() {
return sendCommand(Command::GetMainVersion);
}, filter, timeout);
if(!msg) // Did not receive a message
return std::nullopt;
auto ver = std::dynamic_pointer_cast<VersionMessage>(msg);
if(!ver) // Could not upcast for some reason
return std::nullopt;
if(ver->ForChip != VersionMessage::MainChip || ver->Versions.size() != 1)
return std::nullopt;
ret.push_back(ver->Versions.front());
msg = waitForMessageSync([this]() {
return sendCommand(Command::GetSecondaryVersions);
}, filter, timeout);
if(msg) { // This one is allowed to fail
ver = std::dynamic_pointer_cast<VersionMessage>(msg);
if(ver && ver->ForChip != VersionMessage::MainChip)
ret.insert(ret.end(), ver->Versions.begin(), ver->Versions.end());
}
return ret;
}
std::shared_ptr<LogicalDiskInfoMessage> Communication::getLogicalDiskInfoSync(std::chrono::milliseconds timeout) {
static const std::shared_ptr<MessageFilter> filter = std::make_shared<MessageFilter>(Message::Type::LogicalDiskInfo);
std::shared_ptr<Message> msg = waitForMessageSync([this]() {
return sendCommand(Command::GetLogicalDiskInfo);
}, filter, timeout);
if(!msg) // Did not receive a message
return {};
return std::dynamic_pointer_cast<LogicalDiskInfoMessage>(msg);
}
int Communication::addMessageCallback(const std::shared_ptr<MessageCallback>& cb) {
std::lock_guard<std::mutex> lk(messageCallbacksLock);
messageCallbacks.insert(std::make_pair(messageCallbackIDCounter, cb));
return messageCallbackIDCounter++;
}
bool Communication::removeMessageCallback(int id) {
std::lock_guard<std::mutex> lk(messageCallbacksLock);
try {
messageCallbacks.erase(id);
return true;
} catch(...) {
report(APIEvent::Type::Unknown, APIEvent::Severity::Error);
return false;
}
}
std::shared_ptr<Message> Communication::waitForMessageSync(std::function<bool(void)> onceWaitingDo,
const std::shared_ptr<MessageFilter>& f, std::chrono::milliseconds timeout) {
std::mutex cvMutex;
std::condition_variable cv;
std::shared_ptr<Message> returnedMessage;
#ifdef ICSNEO_ENABLE_DEVICE_SHARING
auto socket = lockSocket();
int64_t ms = timeout.count();
if(!(socket.writeTyped(RPC::DEVICE_LOCK) && socket.writeString(driver->device.serial) && socket.writeTyped(ms)))
return nullptr;
if(bool ret; !(socket.readTyped(ret) && ret))
return nullptr;
#endif
std::unique_lock<std::mutex> fnLk(syncMessageMutex); // Only allow for one sync message at a time
std::unique_lock<std::mutex> cvLk(cvMutex); // Don't let the callback fire until we're waiting for it
int cb = addMessageCallback(std::make_shared<MessageCallback>([&cvMutex, &returnedMessage, &cv](std::shared_ptr<Message> message) {
{
std::lock_guard<std::mutex> lk(cvMutex);
returnedMessage = message;
}
cv.notify_all();
}, f));
// We have now added the callback, do whatever the caller wanted to do
bool fail = !onceWaitingDo();
if(!fail)
cv.wait_for(cvLk, timeout, [&returnedMessage] { return !!returnedMessage; }); // `!!shared_ptr` checks if the ptr has a value
cvLk.unlock(); // Ensure callbacks can complete even if we didn't wait for them
// We don't actually check that we got a message, because either way we want to remove the callback (since it should only happen once)
removeMessageCallback(cb);
// We are now guaranteed that no more callbacks will happen
if(fail) // The caller's function failed, so don't return a message
returnedMessage.reset();
#ifdef ICSNEO_ENABLE_DEVICE_SHARING
if(!(socket.writeTyped(RPC::DEVICE_UNLOCK) && socket.writeString(driver->device.serial)))
return nullptr;
if(bool ret; !(socket.readTyped(ret) && ret))
return nullptr;
#endif
// Then we either will return the message we got or we will return the empty shared_ptr, caller responsible for checking
return returnedMessage;
}
void Communication::dispatchMessage(const std::shared_ptr<Message>& msg) {
std::lock_guard<std::mutex> lk(messageCallbacksLock);
// We want callbacks to be able to access errors
const bool downgrade = EventManager::GetInstance().isDowngradingErrorsOnCurrentThread();
if(downgrade)
EventManager::GetInstance().cancelErrorDowngradingOnCurrentThread();
for(auto& cb : messageCallbacks) {
if(!closing) { // We might have closed while reading or processing
cb.second->callIfMatch(msg);
}
}
if(downgrade)
EventManager::GetInstance().downgradeErrorsOnCurrentThread();
}
void Communication::readTask() {
std::vector<uint8_t> readBytes;
EventManager::GetInstance().downgradeErrorsOnCurrentThread();
while(!closing) {
readBytes.clear();
if(driver->readWait(readBytes)) {
handleInput(*packetizer, readBytes);
}
}
}
void Communication::handleInput(Packetizer& p, std::vector<uint8_t>& readBytes) {
{
std::lock_guard lk(rawCallbacksMutex);
for(auto& cb : rawCallbacks)
cb(readBytes);
}
if(redirectingRead) {
// redirectingRead is an atomic so it can be set without acquiring a mutex
// However, we do not clear it without the mutex. The idea is that if another
// thread calls clearRedirectRead(), it will block until the redirectionFn
// finishes, and after that the redirectionFn will not be called again.
std::unique_lock<std::mutex> lk(redirectingReadMutex);
// So after we acquire the mutex, we need to check the atomic again, and
// if it has become cleared, we *can not* run the redirectionFn.
if(redirectingRead) {
redirectionFn(std::move(readBytes));
} else {
// The redirectionFn got cleared while we were acquiring the lock
lk.unlock(); // We don't need the lock anymore
handleInput(p, readBytes); // and we might as well process this input ourselves
}
} else {
if(p.input(readBytes)) {
for(const auto& packet : p.output()) {
std::shared_ptr<Message> msg;
if(!decoder->decode(msg, packet))
continue;
dispatchMessage(msg);
}
}
}
}