Communication: MultiChannel: Properly mask out communication from non-main VNETs

This also makes it possible for Communication to create more instances of Packetizer
This is necessary because Packetizer is not thread safe,
so when we support more VNETs we will need to create more Packetizers.
pull/25/head
Paul Hollinsky 2020-03-09 13:56:18 -04:00
parent 42780dc610
commit d8798acaa7
11 changed files with 123 additions and 52 deletions

View File

@ -60,7 +60,7 @@ bool Communication::sendPacket(std::vector<uint8_t>& bytes) {
bool Communication::sendCommand(Command cmd, std::vector<uint8_t> arguments) {
std::vector<uint8_t> packet;
if(!encoder->encode(packet, cmd, arguments))
if(!encoder->encode(*packetizer, packet, cmd, arguments))
return false;
return sendPacket(packet);
@ -176,7 +176,7 @@ void Communication::readTask() {
readBytes.clear();
if(impl->readWait(readBytes)) {
if(packetizer->input(readBytes)) {
for(auto& packet : packetizer->output()) {
for(const auto& packet : packetizer->output()) {
std::shared_ptr<Message> msg;
if(!decoder->decode(msg, packet))
continue;

View File

@ -6,7 +6,7 @@
using namespace icsneo;
bool Encoder::encode(std::vector<uint8_t>& result, const std::shared_ptr<Message>& message) {
bool Encoder::encode(const Packetizer& packetizer, std::vector<uint8_t>& result, const std::shared_ptr<Message>& message) {
bool shortFormat = false;
bool useResultAsBuffer = false; // Otherwise it's expected that we use message->data
result.clear();
@ -70,7 +70,7 @@ bool Encoder::encode(std::vector<uint8_t>& result, const std::shared_ptr<Message
(uint8_t)(size >> 8),
(uint8_t)m51msg->command
});
result = packetizer->packetWrap(message->data, shortFormat);
result = packetizer.packetWrap(message->data, shortFormat);
return true;
} else {
message->data.insert(message->data.begin(), { uint8_t(m51msg->command) });
@ -108,11 +108,11 @@ bool Encoder::encode(std::vector<uint8_t>& result, const std::shared_ptr<Message
});
}
result = packetizer->packetWrap(buffer, shortFormat);
result = packetizer.packetWrap(buffer, shortFormat);
return true;
}
bool Encoder::encode(std::vector<uint8_t>& result, Command cmd, std::vector<uint8_t> arguments) {
bool Encoder::encode(const Packetizer& packetizer, std::vector<uint8_t>& result, Command cmd, std::vector<uint8_t> arguments) {
std::shared_ptr<Message> msg;
if(cmd == Command::UpdateLEDState) {
/* NetID::Device is a super old command type.
@ -148,5 +148,5 @@ bool Encoder::encode(std::vector<uint8_t>& result, Command cmd, std::vector<uint
msg->data.insert(msg->data.end(), std::make_move_iterator(arguments.begin()), std::make_move_iterator(arguments.end()));
}
return encode(result, msg);
return encode(packetizer, result, msg);
}

View File

@ -2,19 +2,25 @@
#include "icsneo/communication/command.h"
#include "icsneo/communication/decoder.h"
#include "icsneo/communication/packetizer.h"
#include <iostream>
#include <iomanip>
using namespace icsneo;
void MultiChannelCommunication::spawnThreads() {
mainChannelReadThread = std::thread(&MultiChannelCommunication::readTask, this);
for(size_t i = 0; i < NUM_SUPPORTED_VNETS; i++) {
while(vnetQueues[i].pop()) {} // Ensure the queue is empty
vnetThreads[i] = std::thread(&MultiChannelCommunication::vnetReadTask, this, i);
}
hidReadThread = std::thread(&MultiChannelCommunication::hidReadTask, this);
}
void MultiChannelCommunication::joinThreads() {
closing = true;
if(mainChannelReadThread.joinable())
mainChannelReadThread.join();
if(hidReadThread.joinable())
hidReadThread.join();
for(auto& thread : vnetThreads) {
if(thread.joinable())
thread.join();
}
closing = false;
}
@ -23,7 +29,7 @@ bool MultiChannelCommunication::sendPacket(std::vector<uint8_t>& bytes) {
return rawWrite(bytes);
}
void MultiChannelCommunication::readTask() {
void MultiChannelCommunication::hidReadTask() {
bool readMore = true;
bool gotPacket = false; // Have we got the first valid packet (don't flag errors otherwise)
std::deque<uint8_t> usbReadFifo;
@ -51,7 +57,8 @@ void MultiChannelCommunication::readTask() {
if(!CommandTypeIsValid(currentCommandType)) {
// Device to host bytes discarded
EventManager::GetInstance().add(APIEvent(APIEvent::Type::FailedToRead, APIEvent::Severity::Error));
if(gotPacket)
EventManager::GetInstance().add(APIEvent(APIEvent::Type::FailedToRead, APIEvent::Severity::Error));
usbReadFifo.pop_front();
continue;
}
@ -105,21 +112,65 @@ void MultiChannelCommunication::readTask() {
payloadBytes[i] = usbReadFifo[0];
usbReadFifo.pop_front();
}
if(packetizer->input(payloadBytes)) {
for(auto& packet : packetizer->output()) {
std::shared_ptr<Message> msg;
if(!decoder->decode(msg, packet))
continue; // Error will have been reported from within decoder
gotPacket = true;
dispatchMessage(msg);
}
moodycamel::BlockingReaderWriterQueue< std::vector<uint8_t> >* currentQueue = nullptr;
switch(currentCommandType) {
case CommandType::Vnet1_to_HostPC:
currentQueue = &vnetQueues[0];
break;
case CommandType::Vnet2_to_HostPC:
if(NUM_SUPPORTED_VNETS >= 2)
currentQueue = &vnetQueues[1];
break;
case CommandType::Vnet3_to_HostPC:
if(NUM_SUPPORTED_VNETS >= 3)
currentQueue = &vnetQueues[2];
break;
}
if(currentQueue == nullptr) {
state = PreprocessState::SearchForCommand;
break;
}
if(!currentQueue->enqueue(std::move(payloadBytes)) && gotPacket)
EventManager::GetInstance().add(APIEvent(APIEvent::Type::FailedToRead, APIEvent::Severity::Error));
payloadBytes.clear();
gotPacket = true;
state = PreprocessState::SearchForCommand;
break;
}
}
}
}
void MultiChannelCommunication::vnetReadTask(size_t vnetIndex) {
moodycamel::BlockingReaderWriterQueue< std::vector<uint8_t> >& queue = vnetQueues[vnetIndex];
std::vector<uint8_t> payloadBytes;
std::unique_ptr<Packetizer> packetizerLifetime;
Packetizer* vnetPacketizer;
if(vnetIndex == 0)
vnetPacketizer = packetizer.get();
else {
packetizerLifetime = makeConfiguredPacketizer();
vnetPacketizer = packetizerLifetime.get();
}
EventManager::GetInstance().downgradeErrorsOnCurrentThread();
while(!closing) {
if(queue.wait_dequeue_timed(payloadBytes, std::chrono::milliseconds(250))) {
if(closing)
break;
if(vnetPacketizer->input(payloadBytes)) {
for(const auto& packet : vnetPacketizer->output()) {
std::shared_ptr<Message> msg;
if(!decoder->decode(msg, packet))
continue; // Error will have been reported from within decoder
dispatchMessage(msg);
}
}
}
}
}

View File

@ -13,7 +13,7 @@ uint8_t Packetizer::ICSChecksum(const std::vector<uint8_t>& data) {
return (uint8_t)checksum;
}
std::vector<uint8_t>& Packetizer::packetWrap(std::vector<uint8_t>& data, bool shortFormat) {
std::vector<uint8_t>& Packetizer::packetWrap(std::vector<uint8_t>& data, bool shortFormat) const {
if(shortFormat) {
// Some devices don't use the checksum, so might as well not calculate it if that's the case
// Either way the byte is still expected to be present in the bytestream for short messages

View File

@ -315,7 +315,7 @@ bool Device::transmit(std::shared_ptr<Message> message) {
return transmitStatusFromExtension;
std::vector<uint8_t> packet;
if(!com->encoder->encode(packet, message))
if(!com->encoder->encode(*com->packetizer, packet, message))
return false;
return com->sendPacket(packet);

View File

@ -24,10 +24,12 @@ class Communication {
public:
Communication(
device_eventhandler_t report,
std::unique_ptr<ICommunication> com,
std::shared_ptr<Packetizer> p,
std::unique_ptr<Encoder> e,
std::unique_ptr<Decoder> md) : packetizer(p), encoder(std::move(e)), decoder(std::move(md)), report(report), impl(std::move(com)) {}
std::unique_ptr<ICommunication>&& com,
std::function<std::unique_ptr<Packetizer>()> makeConfiguredPacketizer,
std::unique_ptr<Encoder>&& e,
std::unique_ptr<Decoder>&& md) : makeConfiguredPacketizer(makeConfiguredPacketizer), encoder(std::move(e)), decoder(std::move(md)), report(report), impl(std::move(com)) {
packetizer = makeConfiguredPacketizer();
}
virtual ~Communication() { close(); }
bool open();
@ -59,7 +61,8 @@ public:
MessageFilter f = MessageFilter(),
std::chrono::milliseconds timeout = std::chrono::milliseconds(50));
std::shared_ptr<Packetizer> packetizer; // Ownership is shared with the encoder
std::function<std::unique_ptr<Packetizer>()> makeConfiguredPacketizer;
std::unique_ptr<Packetizer> packetizer;
std::unique_ptr<Encoder> encoder;
std::unique_ptr<Decoder> decoder;
device_eventhandler_t report;

View File

@ -15,14 +15,15 @@ namespace icsneo {
class Encoder {
public:
Encoder(device_eventhandler_t report, std::shared_ptr<Packetizer> p) : packetizer(p), report(report) {}
bool encode(std::vector<uint8_t>& result, const std::shared_ptr<Message>& message);
bool encode(std::vector<uint8_t>& result, Command cmd, bool boolean) { return encode(result, cmd, std::vector<uint8_t>({ (uint8_t)boolean })); }
bool encode(std::vector<uint8_t>& result, Command cmd, std::vector<uint8_t> arguments = {});
Encoder(device_eventhandler_t report) : report(report) {}
bool encode(const Packetizer& packetizer, std::vector<uint8_t>& result, const std::shared_ptr<Message>& message);
bool encode(const Packetizer& packetizer, std::vector<uint8_t>& result, Command cmd, bool boolean) {
return encode(packetizer, result, cmd, std::vector<uint8_t>({ (uint8_t)boolean }));
}
bool encode(const Packetizer& packetizer, std::vector<uint8_t>& result, Command cmd, std::vector<uint8_t> arguments = {});
bool supportCANFD = false;
private:
std::shared_ptr<Packetizer> packetizer;
device_eventhandler_t report;
};

View File

@ -5,6 +5,7 @@
#include "icsneo/communication/icommunication.h"
#include "icsneo/communication/command.h"
#include "icsneo/communication/encoder.h"
#include "icsneo/third-party/readerwriterqueue/readerwriterqueue.h"
namespace icsneo {
@ -13,9 +14,9 @@ public:
MultiChannelCommunication(
device_eventhandler_t err,
std::unique_ptr<ICommunication> com,
std::shared_ptr<Packetizer> p,
std::function<std::unique_ptr<Packetizer>()> makeConfiguredPacketizer,
std::unique_ptr<Encoder> e,
std::unique_ptr<Decoder> md) : Communication(err, std::move(com), p, std::move(e), std::move(md)) {}
std::unique_ptr<Decoder> md) : Communication(err, std::move(com), makeConfiguredPacketizer, std::move(e), std::move(md)) {}
void spawnThreads() override;
void joinThreads() override;
bool sendPacket(std::vector<uint8_t>& bytes) override;
@ -24,6 +25,8 @@ protected:
bool preprocessPacket(std::deque<uint8_t>& usbReadFifo);
private:
static constexpr const size_t NUM_SUPPORTED_VNETS = 1;
enum class CommandType : uint8_t {
PlasmaReadRequest = 0x10, // Status read request to HSC
PlasmaStatusResponse = 0x11, // Status response by HSC
@ -45,7 +48,6 @@ private:
Microblaze_to_HostPC = 0x81 // Microblaze processor data to host PC
};
static bool FixSlaveVNETPacketNetID(Packet& packet);
enum class CoreMiniNetwork : uint8_t {
HSCAN1 = (0),
MSCAN1 = (1),
@ -156,8 +158,11 @@ private:
CommandType currentCommandType;
size_t currentReadIndex = 0;
std::thread mainChannelReadThread;
void readTask();
std::thread hidReadThread;
std::array<std::thread, NUM_SUPPORTED_VNETS> vnetThreads;
std::array<moodycamel::BlockingReaderWriterQueue< std::vector<uint8_t> >, NUM_SUPPORTED_VNETS> vnetQueues;
void hidReadTask();
void vnetReadTask(size_t vnetIndex);
};
}

View File

@ -15,7 +15,7 @@ public:
Packetizer(device_eventhandler_t report) : report(report) {}
std::vector<uint8_t>& packetWrap(std::vector<uint8_t>& data, bool shortFormat);
std::vector<uint8_t>& packetWrap(std::vector<uint8_t>& data, bool shortFormat) const;
bool input(const std::vector<uint8_t>& bytes);
std::vector<std::shared_ptr<Packet>> output();

View File

@ -111,13 +111,11 @@ protected:
report = makeEventHandler();
auto transport = makeTransport<Transport>();
setupTransport(*transport);
auto packetizer = makePacketizer();
setupPacketizer(*packetizer);
auto encoder = makeEncoder(packetizer);
auto encoder = makeEncoder();
setupEncoder(*encoder);
auto decoder = makeDecoder();
setupDecoder(*decoder);
com = makeCommunication(std::move(transport), packetizer, std::move(encoder), std::move(decoder));
com = makeCommunication(std::move(transport), std::bind(&Device::makeConfiguredPacketizer, this), std::move(encoder), std::move(decoder));
setupCommunication(*com);
settings = makeSettings<Settings>(com);
setupSettings(*settings);
@ -136,10 +134,15 @@ protected:
std::unique_ptr<ICommunication> makeTransport() { return std::unique_ptr<ICommunication>(new Transport(report, getWritableNeoDevice())); }
virtual void setupTransport(ICommunication&) {}
virtual std::shared_ptr<Packetizer> makePacketizer() { return std::make_shared<Packetizer>(report); }
virtual std::unique_ptr<Packetizer> makePacketizer() { return std::unique_ptr<Packetizer>(new Packetizer(report)); }
virtual void setupPacketizer(Packetizer&) {}
std::unique_ptr<Packetizer> makeConfiguredPacketizer() {
auto packetizer = makePacketizer();
setupPacketizer(*packetizer);
return packetizer;
}
virtual std::unique_ptr<Encoder> makeEncoder(std::shared_ptr<Packetizer> p) { return std::unique_ptr<Encoder>(new Encoder(report, p)); }
virtual std::unique_ptr<Encoder> makeEncoder() { return std::unique_ptr<Encoder>(new Encoder(report)); }
virtual void setupEncoder(Encoder&) {}
virtual std::unique_ptr<Decoder> makeDecoder() { return std::unique_ptr<Decoder>(new Decoder(report)); }
@ -147,9 +150,9 @@ protected:
virtual std::shared_ptr<Communication> makeCommunication(
std::unique_ptr<ICommunication> t,
std::shared_ptr<Packetizer> p,
std::function<std::unique_ptr<Packetizer>()> makeConfiguredPacketizer,
std::unique_ptr<Encoder> e,
std::unique_ptr<Decoder> d) { return std::make_shared<Communication>(report, std::move(t), p, std::move(e), std::move(d)); }
std::unique_ptr<Decoder> d) { return std::make_shared<Communication>(report, std::move(t), makeConfiguredPacketizer, std::move(e), std::move(d)); }
virtual void setupCommunication(Communication&) {}
template<typename Settings>

View File

@ -12,10 +12,18 @@ class Plasion : public Device {
protected:
virtual std::shared_ptr<Communication> makeCommunication(
std::unique_ptr<ICommunication> transport,
std::shared_ptr<Packetizer> packetizer,
std::function<std::unique_ptr<Packetizer>()> makeConfiguredPacketizer,
std::unique_ptr<Encoder> encoder,
std::unique_ptr<Decoder> decoder
) override { return std::make_shared<MultiChannelCommunication>(report, std::move(transport), packetizer, std::move(encoder), std::move(decoder)); }
) override {
return std::make_shared<MultiChannelCommunication>(
report,
std::move(transport),
makeConfiguredPacketizer,
std::move(encoder),
std::move(decoder)
);
}
// TODO This is done so that Plasion can still transmit it's basic networks, awaiting slave VNET support
virtual bool isSupportedRXNetwork(const Network&) const override { return true; }