diff --git a/communication/communication.cpp b/communication/communication.cpp index c6e4867..65b1a70 100644 --- a/communication/communication.cpp +++ b/communication/communication.cpp @@ -60,7 +60,7 @@ bool Communication::sendPacket(std::vector& bytes) { bool Communication::sendCommand(Command cmd, std::vector arguments) { std::vector packet; - if(!encoder->encode(packet, cmd, arguments)) + if(!encoder->encode(*packetizer, packet, cmd, arguments)) return false; return sendPacket(packet); @@ -176,7 +176,7 @@ void Communication::readTask() { readBytes.clear(); if(impl->readWait(readBytes)) { if(packetizer->input(readBytes)) { - for(auto& packet : packetizer->output()) { + for(const auto& packet : packetizer->output()) { std::shared_ptr msg; if(!decoder->decode(msg, packet)) continue; diff --git a/communication/encoder.cpp b/communication/encoder.cpp index 4ed41e0..e8e644c 100644 --- a/communication/encoder.cpp +++ b/communication/encoder.cpp @@ -6,7 +6,7 @@ using namespace icsneo; -bool Encoder::encode(std::vector& result, const std::shared_ptr& message) { +bool Encoder::encode(const Packetizer& packetizer, std::vector& result, const std::shared_ptr& message) { bool shortFormat = false; bool useResultAsBuffer = false; // Otherwise it's expected that we use message->data result.clear(); @@ -70,7 +70,7 @@ bool Encoder::encode(std::vector& result, const std::shared_ptr> 8), (uint8_t)m51msg->command }); - result = packetizer->packetWrap(message->data, shortFormat); + result = packetizer.packetWrap(message->data, shortFormat); return true; } else { message->data.insert(message->data.begin(), { uint8_t(m51msg->command) }); @@ -108,11 +108,11 @@ bool Encoder::encode(std::vector& result, const std::shared_ptrpacketWrap(buffer, shortFormat); + result = packetizer.packetWrap(buffer, shortFormat); return true; } -bool Encoder::encode(std::vector& result, Command cmd, std::vector arguments) { +bool Encoder::encode(const Packetizer& packetizer, std::vector& result, Command cmd, std::vector arguments) { std::shared_ptr msg; if(cmd == Command::UpdateLEDState) { /* NetID::Device is a super old command type. @@ -148,5 +148,5 @@ bool Encoder::encode(std::vector& result, Command cmd, std::vectordata.insert(msg->data.end(), std::make_move_iterator(arguments.begin()), std::make_move_iterator(arguments.end())); } - return encode(result, msg); + return encode(packetizer, result, msg); } \ No newline at end of file diff --git a/communication/multichannelcommunication.cpp b/communication/multichannelcommunication.cpp index 8ce2d0e..c3758b0 100644 --- a/communication/multichannelcommunication.cpp +++ b/communication/multichannelcommunication.cpp @@ -2,19 +2,25 @@ #include "icsneo/communication/command.h" #include "icsneo/communication/decoder.h" #include "icsneo/communication/packetizer.h" -#include -#include using namespace icsneo; void MultiChannelCommunication::spawnThreads() { - mainChannelReadThread = std::thread(&MultiChannelCommunication::readTask, this); + for(size_t i = 0; i < NUM_SUPPORTED_VNETS; i++) { + while(vnetQueues[i].pop()) {} // Ensure the queue is empty + vnetThreads[i] = std::thread(&MultiChannelCommunication::vnetReadTask, this, i); + } + hidReadThread = std::thread(&MultiChannelCommunication::hidReadTask, this); } void MultiChannelCommunication::joinThreads() { closing = true; - if(mainChannelReadThread.joinable()) - mainChannelReadThread.join(); + if(hidReadThread.joinable()) + hidReadThread.join(); + for(auto& thread : vnetThreads) { + if(thread.joinable()) + thread.join(); + } closing = false; } @@ -23,7 +29,7 @@ bool MultiChannelCommunication::sendPacket(std::vector& bytes) { return rawWrite(bytes); } -void MultiChannelCommunication::readTask() { +void MultiChannelCommunication::hidReadTask() { bool readMore = true; bool gotPacket = false; // Have we got the first valid packet (don't flag errors otherwise) std::deque usbReadFifo; @@ -51,7 +57,8 @@ void MultiChannelCommunication::readTask() { if(!CommandTypeIsValid(currentCommandType)) { // Device to host bytes discarded - EventManager::GetInstance().add(APIEvent(APIEvent::Type::FailedToRead, APIEvent::Severity::Error)); + if(gotPacket) + EventManager::GetInstance().add(APIEvent(APIEvent::Type::FailedToRead, APIEvent::Severity::Error)); usbReadFifo.pop_front(); continue; } @@ -105,21 +112,65 @@ void MultiChannelCommunication::readTask() { payloadBytes[i] = usbReadFifo[0]; usbReadFifo.pop_front(); } - - if(packetizer->input(payloadBytes)) { - for(auto& packet : packetizer->output()) { - std::shared_ptr msg; - if(!decoder->decode(msg, packet)) - continue; // Error will have been reported from within decoder - gotPacket = true; - dispatchMessage(msg); - } + moodycamel::BlockingReaderWriterQueue< std::vector >* currentQueue = nullptr; + switch(currentCommandType) { + case CommandType::Vnet1_to_HostPC: + currentQueue = &vnetQueues[0]; + break; + case CommandType::Vnet2_to_HostPC: + if(NUM_SUPPORTED_VNETS >= 2) + currentQueue = &vnetQueues[1]; + break; + case CommandType::Vnet3_to_HostPC: + if(NUM_SUPPORTED_VNETS >= 3) + currentQueue = &vnetQueues[2]; + break; } + if(currentQueue == nullptr) { + state = PreprocessState::SearchForCommand; + break; + } + + if(!currentQueue->enqueue(std::move(payloadBytes)) && gotPacket) + EventManager::GetInstance().add(APIEvent(APIEvent::Type::FailedToRead, APIEvent::Severity::Error)); + payloadBytes.clear(); + gotPacket = true; state = PreprocessState::SearchForCommand; + break; + } + } + } +} + +void MultiChannelCommunication::vnetReadTask(size_t vnetIndex) { + moodycamel::BlockingReaderWriterQueue< std::vector >& queue = vnetQueues[vnetIndex]; + std::vector payloadBytes; + std::unique_ptr packetizerLifetime; + Packetizer* vnetPacketizer; + if(vnetIndex == 0) + vnetPacketizer = packetizer.get(); + else { + packetizerLifetime = makeConfiguredPacketizer(); + vnetPacketizer = packetizerLifetime.get(); + } + + EventManager::GetInstance().downgradeErrorsOnCurrentThread(); + + while(!closing) { + if(queue.wait_dequeue_timed(payloadBytes, std::chrono::milliseconds(250))) { + if(closing) + break; + + if(vnetPacketizer->input(payloadBytes)) { + for(const auto& packet : vnetPacketizer->output()) { + std::shared_ptr msg; + if(!decoder->decode(msg, packet)) + continue; // Error will have been reported from within decoder + dispatchMessage(msg); + } } } - } } \ No newline at end of file diff --git a/communication/packetizer.cpp b/communication/packetizer.cpp index ba0519b..8c5d8b8 100644 --- a/communication/packetizer.cpp +++ b/communication/packetizer.cpp @@ -13,7 +13,7 @@ uint8_t Packetizer::ICSChecksum(const std::vector& data) { return (uint8_t)checksum; } -std::vector& Packetizer::packetWrap(std::vector& data, bool shortFormat) { +std::vector& Packetizer::packetWrap(std::vector& data, bool shortFormat) const { if(shortFormat) { // Some devices don't use the checksum, so might as well not calculate it if that's the case // Either way the byte is still expected to be present in the bytestream for short messages diff --git a/device/device.cpp b/device/device.cpp index c07e857..54365ff 100644 --- a/device/device.cpp +++ b/device/device.cpp @@ -315,7 +315,7 @@ bool Device::transmit(std::shared_ptr message) { return transmitStatusFromExtension; std::vector packet; - if(!com->encoder->encode(packet, message)) + if(!com->encoder->encode(*com->packetizer, packet, message)) return false; return com->sendPacket(packet); diff --git a/include/icsneo/communication/communication.h b/include/icsneo/communication/communication.h index c0bfe4a..47b04c0 100644 --- a/include/icsneo/communication/communication.h +++ b/include/icsneo/communication/communication.h @@ -24,10 +24,12 @@ class Communication { public: Communication( device_eventhandler_t report, - std::unique_ptr com, - std::shared_ptr p, - std::unique_ptr e, - std::unique_ptr md) : packetizer(p), encoder(std::move(e)), decoder(std::move(md)), report(report), impl(std::move(com)) {} + std::unique_ptr&& com, + std::function()> makeConfiguredPacketizer, + std::unique_ptr&& e, + std::unique_ptr&& md) : makeConfiguredPacketizer(makeConfiguredPacketizer), encoder(std::move(e)), decoder(std::move(md)), report(report), impl(std::move(com)) { + packetizer = makeConfiguredPacketizer(); + } virtual ~Communication() { close(); } bool open(); @@ -59,7 +61,8 @@ public: MessageFilter f = MessageFilter(), std::chrono::milliseconds timeout = std::chrono::milliseconds(50)); - std::shared_ptr packetizer; // Ownership is shared with the encoder + std::function()> makeConfiguredPacketizer; + std::unique_ptr packetizer; std::unique_ptr encoder; std::unique_ptr decoder; device_eventhandler_t report; diff --git a/include/icsneo/communication/encoder.h b/include/icsneo/communication/encoder.h index 3562678..a382ee8 100644 --- a/include/icsneo/communication/encoder.h +++ b/include/icsneo/communication/encoder.h @@ -15,14 +15,15 @@ namespace icsneo { class Encoder { public: - Encoder(device_eventhandler_t report, std::shared_ptr p) : packetizer(p), report(report) {} - bool encode(std::vector& result, const std::shared_ptr& message); - bool encode(std::vector& result, Command cmd, bool boolean) { return encode(result, cmd, std::vector({ (uint8_t)boolean })); } - bool encode(std::vector& result, Command cmd, std::vector arguments = {}); + Encoder(device_eventhandler_t report) : report(report) {} + bool encode(const Packetizer& packetizer, std::vector& result, const std::shared_ptr& message); + bool encode(const Packetizer& packetizer, std::vector& result, Command cmd, bool boolean) { + return encode(packetizer, result, cmd, std::vector({ (uint8_t)boolean })); + } + bool encode(const Packetizer& packetizer, std::vector& result, Command cmd, std::vector arguments = {}); bool supportCANFD = false; private: - std::shared_ptr packetizer; device_eventhandler_t report; }; diff --git a/include/icsneo/communication/multichannelcommunication.h b/include/icsneo/communication/multichannelcommunication.h index 90a366e..8342cc4 100644 --- a/include/icsneo/communication/multichannelcommunication.h +++ b/include/icsneo/communication/multichannelcommunication.h @@ -5,6 +5,7 @@ #include "icsneo/communication/icommunication.h" #include "icsneo/communication/command.h" #include "icsneo/communication/encoder.h" +#include "icsneo/third-party/readerwriterqueue/readerwriterqueue.h" namespace icsneo { @@ -13,9 +14,9 @@ public: MultiChannelCommunication( device_eventhandler_t err, std::unique_ptr com, - std::shared_ptr p, + std::function()> makeConfiguredPacketizer, std::unique_ptr e, - std::unique_ptr md) : Communication(err, std::move(com), p, std::move(e), std::move(md)) {} + std::unique_ptr md) : Communication(err, std::move(com), makeConfiguredPacketizer, std::move(e), std::move(md)) {} void spawnThreads() override; void joinThreads() override; bool sendPacket(std::vector& bytes) override; @@ -24,6 +25,8 @@ protected: bool preprocessPacket(std::deque& usbReadFifo); private: + static constexpr const size_t NUM_SUPPORTED_VNETS = 1; + enum class CommandType : uint8_t { PlasmaReadRequest = 0x10, // Status read request to HSC PlasmaStatusResponse = 0x11, // Status response by HSC @@ -45,7 +48,6 @@ private: Microblaze_to_HostPC = 0x81 // Microblaze processor data to host PC }; - static bool FixSlaveVNETPacketNetID(Packet& packet); enum class CoreMiniNetwork : uint8_t { HSCAN1 = (0), MSCAN1 = (1), @@ -156,8 +158,11 @@ private: CommandType currentCommandType; size_t currentReadIndex = 0; - std::thread mainChannelReadThread; - void readTask(); + std::thread hidReadThread; + std::array vnetThreads; + std::array >, NUM_SUPPORTED_VNETS> vnetQueues; + void hidReadTask(); + void vnetReadTask(size_t vnetIndex); }; } diff --git a/include/icsneo/communication/packetizer.h b/include/icsneo/communication/packetizer.h index 9cea2a9..1f7b456 100644 --- a/include/icsneo/communication/packetizer.h +++ b/include/icsneo/communication/packetizer.h @@ -15,7 +15,7 @@ public: Packetizer(device_eventhandler_t report) : report(report) {} - std::vector& packetWrap(std::vector& data, bool shortFormat); + std::vector& packetWrap(std::vector& data, bool shortFormat) const; bool input(const std::vector& bytes); std::vector> output(); diff --git a/include/icsneo/device/device.h b/include/icsneo/device/device.h index d593942..94e1bfc 100644 --- a/include/icsneo/device/device.h +++ b/include/icsneo/device/device.h @@ -111,13 +111,11 @@ protected: report = makeEventHandler(); auto transport = makeTransport(); setupTransport(*transport); - auto packetizer = makePacketizer(); - setupPacketizer(*packetizer); - auto encoder = makeEncoder(packetizer); + auto encoder = makeEncoder(); setupEncoder(*encoder); auto decoder = makeDecoder(); setupDecoder(*decoder); - com = makeCommunication(std::move(transport), packetizer, std::move(encoder), std::move(decoder)); + com = makeCommunication(std::move(transport), std::bind(&Device::makeConfiguredPacketizer, this), std::move(encoder), std::move(decoder)); setupCommunication(*com); settings = makeSettings(com); setupSettings(*settings); @@ -136,10 +134,15 @@ protected: std::unique_ptr makeTransport() { return std::unique_ptr(new Transport(report, getWritableNeoDevice())); } virtual void setupTransport(ICommunication&) {} - virtual std::shared_ptr makePacketizer() { return std::make_shared(report); } + virtual std::unique_ptr makePacketizer() { return std::unique_ptr(new Packetizer(report)); } virtual void setupPacketizer(Packetizer&) {} + std::unique_ptr makeConfiguredPacketizer() { + auto packetizer = makePacketizer(); + setupPacketizer(*packetizer); + return packetizer; + } - virtual std::unique_ptr makeEncoder(std::shared_ptr p) { return std::unique_ptr(new Encoder(report, p)); } + virtual std::unique_ptr makeEncoder() { return std::unique_ptr(new Encoder(report)); } virtual void setupEncoder(Encoder&) {} virtual std::unique_ptr makeDecoder() { return std::unique_ptr(new Decoder(report)); } @@ -147,9 +150,9 @@ protected: virtual std::shared_ptr makeCommunication( std::unique_ptr t, - std::shared_ptr p, + std::function()> makeConfiguredPacketizer, std::unique_ptr e, - std::unique_ptr d) { return std::make_shared(report, std::move(t), p, std::move(e), std::move(d)); } + std::unique_ptr d) { return std::make_shared(report, std::move(t), makeConfiguredPacketizer, std::move(e), std::move(d)); } virtual void setupCommunication(Communication&) {} template diff --git a/include/icsneo/device/tree/plasion/plasion.h b/include/icsneo/device/tree/plasion/plasion.h index 8613d39..0525093 100644 --- a/include/icsneo/device/tree/plasion/plasion.h +++ b/include/icsneo/device/tree/plasion/plasion.h @@ -12,10 +12,18 @@ class Plasion : public Device { protected: virtual std::shared_ptr makeCommunication( std::unique_ptr transport, - std::shared_ptr packetizer, + std::function()> makeConfiguredPacketizer, std::unique_ptr encoder, std::unique_ptr decoder - ) override { return std::make_shared(report, std::move(transport), packetizer, std::move(encoder), std::move(decoder)); } + ) override { + return std::make_shared( + report, + std::move(transport), + makeConfiguredPacketizer, + std::move(encoder), + std::move(decoder) + ); + } // TODO This is done so that Plasion can still transmit it's basic networks, awaiting slave VNET support virtual bool isSupportedRXNetwork(const Network&) const override { return true; }