From d8798acaa75a7fa55e353b6faee5ba6aec6f55af Mon Sep 17 00:00:00 2001 From: Paul Hollinsky Date: Mon, 9 Mar 2020 13:56:18 -0400 Subject: [PATCH] Communication: MultiChannel: Properly mask out communication from non-main VNETs This also makes it possible for Communication to create more instances of Packetizer This is necessary because Packetizer is not thread safe, so when we support more VNETs we will need to create more Packetizers. --- communication/communication.cpp | 4 +- communication/encoder.cpp | 10 +-- communication/multichannelcommunication.cpp | 85 +++++++++++++++---- communication/packetizer.cpp | 2 +- device/device.cpp | 2 +- include/icsneo/communication/communication.h | 13 +-- include/icsneo/communication/encoder.h | 11 +-- .../communication/multichannelcommunication.h | 15 ++-- include/icsneo/communication/packetizer.h | 2 +- include/icsneo/device/device.h | 19 +++-- include/icsneo/device/tree/plasion/plasion.h | 12 ++- 11 files changed, 123 insertions(+), 52 deletions(-) diff --git a/communication/communication.cpp b/communication/communication.cpp index c6e4867..65b1a70 100644 --- a/communication/communication.cpp +++ b/communication/communication.cpp @@ -60,7 +60,7 @@ bool Communication::sendPacket(std::vector& bytes) { bool Communication::sendCommand(Command cmd, std::vector arguments) { std::vector packet; - if(!encoder->encode(packet, cmd, arguments)) + if(!encoder->encode(*packetizer, packet, cmd, arguments)) return false; return sendPacket(packet); @@ -176,7 +176,7 @@ void Communication::readTask() { readBytes.clear(); if(impl->readWait(readBytes)) { if(packetizer->input(readBytes)) { - for(auto& packet : packetizer->output()) { + for(const auto& packet : packetizer->output()) { std::shared_ptr msg; if(!decoder->decode(msg, packet)) continue; diff --git a/communication/encoder.cpp b/communication/encoder.cpp index 4ed41e0..e8e644c 100644 --- a/communication/encoder.cpp +++ b/communication/encoder.cpp @@ -6,7 +6,7 @@ using namespace icsneo; -bool Encoder::encode(std::vector& result, const std::shared_ptr& message) { +bool Encoder::encode(const Packetizer& packetizer, std::vector& result, const std::shared_ptr& message) { bool shortFormat = false; bool useResultAsBuffer = false; // Otherwise it's expected that we use message->data result.clear(); @@ -70,7 +70,7 @@ bool Encoder::encode(std::vector& result, const std::shared_ptr> 8), (uint8_t)m51msg->command }); - result = packetizer->packetWrap(message->data, shortFormat); + result = packetizer.packetWrap(message->data, shortFormat); return true; } else { message->data.insert(message->data.begin(), { uint8_t(m51msg->command) }); @@ -108,11 +108,11 @@ bool Encoder::encode(std::vector& result, const std::shared_ptrpacketWrap(buffer, shortFormat); + result = packetizer.packetWrap(buffer, shortFormat); return true; } -bool Encoder::encode(std::vector& result, Command cmd, std::vector arguments) { +bool Encoder::encode(const Packetizer& packetizer, std::vector& result, Command cmd, std::vector arguments) { std::shared_ptr msg; if(cmd == Command::UpdateLEDState) { /* NetID::Device is a super old command type. @@ -148,5 +148,5 @@ bool Encoder::encode(std::vector& result, Command cmd, std::vectordata.insert(msg->data.end(), std::make_move_iterator(arguments.begin()), std::make_move_iterator(arguments.end())); } - return encode(result, msg); + return encode(packetizer, result, msg); } \ No newline at end of file diff --git a/communication/multichannelcommunication.cpp b/communication/multichannelcommunication.cpp index 8ce2d0e..c3758b0 100644 --- a/communication/multichannelcommunication.cpp +++ b/communication/multichannelcommunication.cpp @@ -2,19 +2,25 @@ #include "icsneo/communication/command.h" #include "icsneo/communication/decoder.h" #include "icsneo/communication/packetizer.h" -#include -#include using namespace icsneo; void MultiChannelCommunication::spawnThreads() { - mainChannelReadThread = std::thread(&MultiChannelCommunication::readTask, this); + for(size_t i = 0; i < NUM_SUPPORTED_VNETS; i++) { + while(vnetQueues[i].pop()) {} // Ensure the queue is empty + vnetThreads[i] = std::thread(&MultiChannelCommunication::vnetReadTask, this, i); + } + hidReadThread = std::thread(&MultiChannelCommunication::hidReadTask, this); } void MultiChannelCommunication::joinThreads() { closing = true; - if(mainChannelReadThread.joinable()) - mainChannelReadThread.join(); + if(hidReadThread.joinable()) + hidReadThread.join(); + for(auto& thread : vnetThreads) { + if(thread.joinable()) + thread.join(); + } closing = false; } @@ -23,7 +29,7 @@ bool MultiChannelCommunication::sendPacket(std::vector& bytes) { return rawWrite(bytes); } -void MultiChannelCommunication::readTask() { +void MultiChannelCommunication::hidReadTask() { bool readMore = true; bool gotPacket = false; // Have we got the first valid packet (don't flag errors otherwise) std::deque usbReadFifo; @@ -51,7 +57,8 @@ void MultiChannelCommunication::readTask() { if(!CommandTypeIsValid(currentCommandType)) { // Device to host bytes discarded - EventManager::GetInstance().add(APIEvent(APIEvent::Type::FailedToRead, APIEvent::Severity::Error)); + if(gotPacket) + EventManager::GetInstance().add(APIEvent(APIEvent::Type::FailedToRead, APIEvent::Severity::Error)); usbReadFifo.pop_front(); continue; } @@ -105,21 +112,65 @@ void MultiChannelCommunication::readTask() { payloadBytes[i] = usbReadFifo[0]; usbReadFifo.pop_front(); } - - if(packetizer->input(payloadBytes)) { - for(auto& packet : packetizer->output()) { - std::shared_ptr msg; - if(!decoder->decode(msg, packet)) - continue; // Error will have been reported from within decoder - gotPacket = true; - dispatchMessage(msg); - } + moodycamel::BlockingReaderWriterQueue< std::vector >* currentQueue = nullptr; + switch(currentCommandType) { + case CommandType::Vnet1_to_HostPC: + currentQueue = &vnetQueues[0]; + break; + case CommandType::Vnet2_to_HostPC: + if(NUM_SUPPORTED_VNETS >= 2) + currentQueue = &vnetQueues[1]; + break; + case CommandType::Vnet3_to_HostPC: + if(NUM_SUPPORTED_VNETS >= 3) + currentQueue = &vnetQueues[2]; + break; } + if(currentQueue == nullptr) { + state = PreprocessState::SearchForCommand; + break; + } + + if(!currentQueue->enqueue(std::move(payloadBytes)) && gotPacket) + EventManager::GetInstance().add(APIEvent(APIEvent::Type::FailedToRead, APIEvent::Severity::Error)); + payloadBytes.clear(); + gotPacket = true; state = PreprocessState::SearchForCommand; + break; + } + } + } +} + +void MultiChannelCommunication::vnetReadTask(size_t vnetIndex) { + moodycamel::BlockingReaderWriterQueue< std::vector >& queue = vnetQueues[vnetIndex]; + std::vector payloadBytes; + std::unique_ptr packetizerLifetime; + Packetizer* vnetPacketizer; + if(vnetIndex == 0) + vnetPacketizer = packetizer.get(); + else { + packetizerLifetime = makeConfiguredPacketizer(); + vnetPacketizer = packetizerLifetime.get(); + } + + EventManager::GetInstance().downgradeErrorsOnCurrentThread(); + + while(!closing) { + if(queue.wait_dequeue_timed(payloadBytes, std::chrono::milliseconds(250))) { + if(closing) + break; + + if(vnetPacketizer->input(payloadBytes)) { + for(const auto& packet : vnetPacketizer->output()) { + std::shared_ptr msg; + if(!decoder->decode(msg, packet)) + continue; // Error will have been reported from within decoder + dispatchMessage(msg); + } } } - } } \ No newline at end of file diff --git a/communication/packetizer.cpp b/communication/packetizer.cpp index ba0519b..8c5d8b8 100644 --- a/communication/packetizer.cpp +++ b/communication/packetizer.cpp @@ -13,7 +13,7 @@ uint8_t Packetizer::ICSChecksum(const std::vector& data) { return (uint8_t)checksum; } -std::vector& Packetizer::packetWrap(std::vector& data, bool shortFormat) { +std::vector& Packetizer::packetWrap(std::vector& data, bool shortFormat) const { if(shortFormat) { // Some devices don't use the checksum, so might as well not calculate it if that's the case // Either way the byte is still expected to be present in the bytestream for short messages diff --git a/device/device.cpp b/device/device.cpp index c07e857..54365ff 100644 --- a/device/device.cpp +++ b/device/device.cpp @@ -315,7 +315,7 @@ bool Device::transmit(std::shared_ptr message) { return transmitStatusFromExtension; std::vector packet; - if(!com->encoder->encode(packet, message)) + if(!com->encoder->encode(*com->packetizer, packet, message)) return false; return com->sendPacket(packet); diff --git a/include/icsneo/communication/communication.h b/include/icsneo/communication/communication.h index c0bfe4a..47b04c0 100644 --- a/include/icsneo/communication/communication.h +++ b/include/icsneo/communication/communication.h @@ -24,10 +24,12 @@ class Communication { public: Communication( device_eventhandler_t report, - std::unique_ptr com, - std::shared_ptr p, - std::unique_ptr e, - std::unique_ptr md) : packetizer(p), encoder(std::move(e)), decoder(std::move(md)), report(report), impl(std::move(com)) {} + std::unique_ptr&& com, + std::function()> makeConfiguredPacketizer, + std::unique_ptr&& e, + std::unique_ptr&& md) : makeConfiguredPacketizer(makeConfiguredPacketizer), encoder(std::move(e)), decoder(std::move(md)), report(report), impl(std::move(com)) { + packetizer = makeConfiguredPacketizer(); + } virtual ~Communication() { close(); } bool open(); @@ -59,7 +61,8 @@ public: MessageFilter f = MessageFilter(), std::chrono::milliseconds timeout = std::chrono::milliseconds(50)); - std::shared_ptr packetizer; // Ownership is shared with the encoder + std::function()> makeConfiguredPacketizer; + std::unique_ptr packetizer; std::unique_ptr encoder; std::unique_ptr decoder; device_eventhandler_t report; diff --git a/include/icsneo/communication/encoder.h b/include/icsneo/communication/encoder.h index 3562678..a382ee8 100644 --- a/include/icsneo/communication/encoder.h +++ b/include/icsneo/communication/encoder.h @@ -15,14 +15,15 @@ namespace icsneo { class Encoder { public: - Encoder(device_eventhandler_t report, std::shared_ptr p) : packetizer(p), report(report) {} - bool encode(std::vector& result, const std::shared_ptr& message); - bool encode(std::vector& result, Command cmd, bool boolean) { return encode(result, cmd, std::vector({ (uint8_t)boolean })); } - bool encode(std::vector& result, Command cmd, std::vector arguments = {}); + Encoder(device_eventhandler_t report) : report(report) {} + bool encode(const Packetizer& packetizer, std::vector& result, const std::shared_ptr& message); + bool encode(const Packetizer& packetizer, std::vector& result, Command cmd, bool boolean) { + return encode(packetizer, result, cmd, std::vector({ (uint8_t)boolean })); + } + bool encode(const Packetizer& packetizer, std::vector& result, Command cmd, std::vector arguments = {}); bool supportCANFD = false; private: - std::shared_ptr packetizer; device_eventhandler_t report; }; diff --git a/include/icsneo/communication/multichannelcommunication.h b/include/icsneo/communication/multichannelcommunication.h index 90a366e..8342cc4 100644 --- a/include/icsneo/communication/multichannelcommunication.h +++ b/include/icsneo/communication/multichannelcommunication.h @@ -5,6 +5,7 @@ #include "icsneo/communication/icommunication.h" #include "icsneo/communication/command.h" #include "icsneo/communication/encoder.h" +#include "icsneo/third-party/readerwriterqueue/readerwriterqueue.h" namespace icsneo { @@ -13,9 +14,9 @@ public: MultiChannelCommunication( device_eventhandler_t err, std::unique_ptr com, - std::shared_ptr p, + std::function()> makeConfiguredPacketizer, std::unique_ptr e, - std::unique_ptr md) : Communication(err, std::move(com), p, std::move(e), std::move(md)) {} + std::unique_ptr md) : Communication(err, std::move(com), makeConfiguredPacketizer, std::move(e), std::move(md)) {} void spawnThreads() override; void joinThreads() override; bool sendPacket(std::vector& bytes) override; @@ -24,6 +25,8 @@ protected: bool preprocessPacket(std::deque& usbReadFifo); private: + static constexpr const size_t NUM_SUPPORTED_VNETS = 1; + enum class CommandType : uint8_t { PlasmaReadRequest = 0x10, // Status read request to HSC PlasmaStatusResponse = 0x11, // Status response by HSC @@ -45,7 +48,6 @@ private: Microblaze_to_HostPC = 0x81 // Microblaze processor data to host PC }; - static bool FixSlaveVNETPacketNetID(Packet& packet); enum class CoreMiniNetwork : uint8_t { HSCAN1 = (0), MSCAN1 = (1), @@ -156,8 +158,11 @@ private: CommandType currentCommandType; size_t currentReadIndex = 0; - std::thread mainChannelReadThread; - void readTask(); + std::thread hidReadThread; + std::array vnetThreads; + std::array >, NUM_SUPPORTED_VNETS> vnetQueues; + void hidReadTask(); + void vnetReadTask(size_t vnetIndex); }; } diff --git a/include/icsneo/communication/packetizer.h b/include/icsneo/communication/packetizer.h index 9cea2a9..1f7b456 100644 --- a/include/icsneo/communication/packetizer.h +++ b/include/icsneo/communication/packetizer.h @@ -15,7 +15,7 @@ public: Packetizer(device_eventhandler_t report) : report(report) {} - std::vector& packetWrap(std::vector& data, bool shortFormat); + std::vector& packetWrap(std::vector& data, bool shortFormat) const; bool input(const std::vector& bytes); std::vector> output(); diff --git a/include/icsneo/device/device.h b/include/icsneo/device/device.h index d593942..94e1bfc 100644 --- a/include/icsneo/device/device.h +++ b/include/icsneo/device/device.h @@ -111,13 +111,11 @@ protected: report = makeEventHandler(); auto transport = makeTransport(); setupTransport(*transport); - auto packetizer = makePacketizer(); - setupPacketizer(*packetizer); - auto encoder = makeEncoder(packetizer); + auto encoder = makeEncoder(); setupEncoder(*encoder); auto decoder = makeDecoder(); setupDecoder(*decoder); - com = makeCommunication(std::move(transport), packetizer, std::move(encoder), std::move(decoder)); + com = makeCommunication(std::move(transport), std::bind(&Device::makeConfiguredPacketizer, this), std::move(encoder), std::move(decoder)); setupCommunication(*com); settings = makeSettings(com); setupSettings(*settings); @@ -136,10 +134,15 @@ protected: std::unique_ptr makeTransport() { return std::unique_ptr(new Transport(report, getWritableNeoDevice())); } virtual void setupTransport(ICommunication&) {} - virtual std::shared_ptr makePacketizer() { return std::make_shared(report); } + virtual std::unique_ptr makePacketizer() { return std::unique_ptr(new Packetizer(report)); } virtual void setupPacketizer(Packetizer&) {} + std::unique_ptr makeConfiguredPacketizer() { + auto packetizer = makePacketizer(); + setupPacketizer(*packetizer); + return packetizer; + } - virtual std::unique_ptr makeEncoder(std::shared_ptr p) { return std::unique_ptr(new Encoder(report, p)); } + virtual std::unique_ptr makeEncoder() { return std::unique_ptr(new Encoder(report)); } virtual void setupEncoder(Encoder&) {} virtual std::unique_ptr makeDecoder() { return std::unique_ptr(new Decoder(report)); } @@ -147,9 +150,9 @@ protected: virtual std::shared_ptr makeCommunication( std::unique_ptr t, - std::shared_ptr p, + std::function()> makeConfiguredPacketizer, std::unique_ptr e, - std::unique_ptr d) { return std::make_shared(report, std::move(t), p, std::move(e), std::move(d)); } + std::unique_ptr d) { return std::make_shared(report, std::move(t), makeConfiguredPacketizer, std::move(e), std::move(d)); } virtual void setupCommunication(Communication&) {} template diff --git a/include/icsneo/device/tree/plasion/plasion.h b/include/icsneo/device/tree/plasion/plasion.h index 8613d39..0525093 100644 --- a/include/icsneo/device/tree/plasion/plasion.h +++ b/include/icsneo/device/tree/plasion/plasion.h @@ -12,10 +12,18 @@ class Plasion : public Device { protected: virtual std::shared_ptr makeCommunication( std::unique_ptr transport, - std::shared_ptr packetizer, + std::function()> makeConfiguredPacketizer, std::unique_ptr encoder, std::unique_ptr decoder - ) override { return std::make_shared(report, std::move(transport), packetizer, std::move(encoder), std::move(decoder)); } + ) override { + return std::make_shared( + report, + std::move(transport), + makeConfiguredPacketizer, + std::move(encoder), + std::move(decoder) + ); + } // TODO This is done so that Plasion can still transmit it's basic networks, awaiting slave VNET support virtual bool isSupportedRXNetwork(const Network&) const override { return true; }