Transmits now block when the buffer fills

pull/13/head
Paul Hollinsky 2019-05-02 16:33:44 -04:00
parent 5a6a1c990a
commit 6ffc364eba
14 changed files with 45 additions and 11 deletions

View File

@ -63,6 +63,7 @@ static constexpr const char* ERROR_FAILED_TO_READ = "A read operation failed.";
static constexpr const char* ERROR_FAILED_TO_WRITE = "A write operation failed.";
static constexpr const char* ERROR_DRIVER_FAILED_TO_OPEN = "The device driver encountered a low-level error while opening the device.";
static constexpr const char* ERROR_PACKET_CHECKSUM_ERROR = "There was a checksum error while decoding a packet. The packet was dropped.";
static constexpr const char* ERROR_TRANSMIT_BUFFER_FULL = "The transmit buffer is full and the device is set to non-blocking.";
static constexpr const char* ERROR_PCAP_COULD_NOT_START = "The PCAP driver could not be started. Ethernet devices will not be found.";
static constexpr const char* ERROR_PCAP_COULD_NOT_FIND_DEVICES = "The PCAP driver failed to find devices. Ethernet devices will not be found.";
@ -110,6 +111,8 @@ const char* APIError::DescriptionForType(ErrorType type) {
return ERROR_DRIVER_FAILED_TO_OPEN;
case PacketChecksumError:
return ERROR_PACKET_CHECKSUM_ERROR;
case TransmitBufferFull:
return ERROR_TRANSMIT_BUFFER_FULL;
case PCAPCouldNotStart:
return ERROR_PCAP_COULD_NOT_START;
case PCAPCouldNotFindDevices:
@ -154,6 +157,7 @@ APIError::Severity APIError::SeverityForType(ErrorType type) {
case FailedToWrite:
case DriverFailedToOpen:
case PacketChecksumError:
case TransmitBufferFull:
// Other Errors
case TooManyErrors:
case Unknown:

View File

@ -39,5 +39,16 @@ bool ICommunication::readWait(std::vector<uint8_t>& bytes, std::chrono::millisec
}
bool ICommunication::write(const std::vector<uint8_t>& bytes) {
if(writeBlocks) {
std::unique_lock<std::mutex> lk(writeMutex);
if(writeQueue.size_approx() > writeQueueSize) {
writeCV.wait(lk);
}
} else {
if(writeQueue.size_approx() > writeQueueSize) {
err(APIError::TransmitBufferFull);
return false;
}
}
return writeQueue.enqueue(WriteOperation(bytes));
}

View File

@ -54,6 +54,7 @@ public:
FailedToWrite = 0x3001,
DriverFailedToOpen = 0x3002,
PacketChecksumError = 0x3003,
TransmitBufferFull = 0x3004,
PCAPCouldNotStart = 0x3102,
PCAPCouldNotFindDevices = 0x3103,

View File

@ -5,12 +5,16 @@
#include <chrono>
#include <atomic>
#include <thread>
#include <mutex>
#include <condition_variable>
#include "icsneo/api/errormanager.h"
#include "icsneo/third-party/concurrentqueue/blockingconcurrentqueue.h"
namespace icsneo {
class ICommunication {
public:
ICommunication(const device_errorhandler_t& handler) : err(handler) {}
virtual ~ICommunication() {}
virtual bool open() = 0;
virtual bool isOpen() = 0;
@ -18,12 +22,21 @@ public:
virtual bool read(std::vector<uint8_t>& bytes, size_t limit = 0);
virtual bool readWait(std::vector<uint8_t>& bytes, std::chrono::milliseconds timeout = std::chrono::milliseconds(100), size_t limit = 0);
virtual bool write(const std::vector<uint8_t>& bytes);
inline void onWrite() {
if(writeQueue.size_approx() < (writeQueueSize * 3/4))
writeCV.notify_one();
}
device_errorhandler_t err;
size_t writeQueueSize = 50;
bool writeBlocks = true; // Otherwise it just fails when the queue is full
protected:
class WriteOperation {
public:
WriteOperation() {}
WriteOperation(std::vector<uint8_t> b) { bytes = b; }
WriteOperation(const std::vector<uint8_t>& b) : bytes(b) {}
std::vector<uint8_t> bytes;
};
enum IOTaskState {
@ -34,6 +47,8 @@ protected:
virtual void writeTask() = 0;
moodycamel::BlockingConcurrentQueue<uint8_t> readQueue;
moodycamel::BlockingConcurrentQueue<WriteOperation> writeQueue;
std::mutex writeMutex;
std::condition_variable writeCV;
std::thread readThread, writeThread;
std::atomic<bool> closing{false};
};

View File

@ -16,7 +16,7 @@ class FTDI : public ICommunication {
public:
static std::vector<neodevice_t> FindByProduct(int product);
FTDI(device_errorhandler_t err, neodevice_t& forDevice);
FTDI(const device_errorhandler_t& err, neodevice_t& forDevice);
~FTDI() { close(); }
bool open();
bool close();
@ -56,7 +56,6 @@ private:
bool openable; // Set to false in the constructor if the object has not been found in searchResultDevices
neodevice_t& device;
device_errorhandler_t err;
};
}

View File

@ -11,7 +11,7 @@ namespace icsneo {
class STM32 : public ICommunication {
public:
STM32(device_errorhandler_t err, neodevice_t& forDevice) : device(forDevice), err(err) {}
STM32(const device_errorhandler_t& err, neodevice_t& forDevice) : ICommunication(err), device(forDevice) {}
static std::vector<neodevice_t> FindByProduct(int product);
bool open();
@ -20,7 +20,6 @@ public:
private:
neodevice_t& device;
device_errorhandler_t err;
int fd = -1;
static constexpr neodevice_handle_t HANDLE_OFFSET = 10;

View File

@ -7,7 +7,7 @@ namespace icsneo {
class FTDI : public VCP {
public:
FTDI(device_errorhandler_t err, neodevice_t& forDevice) : VCP(err, forDevice) {}
FTDI(const device_errorhandler_t& err, neodevice_t& forDevice) : VCP(err, forDevice) {}
static std::vector<neodevice_t> FindByProduct(int product) { return VCP::FindByProduct(product, { L"serenum" /*, L"ftdibus" */ }); }
};

View File

@ -21,7 +21,7 @@ public:
static std::string GetEthDevSerialFromMacAddress(uint8_t product, uint16_t macSerial);
static bool IsHandleValid(neodevice_handle_t handle);
PCAP(device_errorhandler_t err, neodevice_t& forDevice);
PCAP(const device_errorhandler_t& err, neodevice_t& forDevice);
bool open();
bool isOpen();
bool close();

View File

@ -7,7 +7,7 @@ namespace icsneo {
class STM32 : public VCP {
public:
STM32(device_errorhandler_t err, neodevice_t& forDevice) : VCP(err, forDevice) {}
STM32(const device_errorhandler_t& err, neodevice_t& forDevice) : VCP(err, forDevice) {}
static std::vector<neodevice_t> FindByProduct(int product) { return VCP::FindByProduct(product, { L"usbser" }); }
};

View File

@ -20,7 +20,7 @@ public:
static bool IsHandleValid(neodevice_handle_t handle);
typedef void(*fn_boolCallback)(bool success);
VCP(device_errorhandler_t err, neodevice_t& forDevice) : device(forDevice), err(err) {
VCP(const device_errorhandler_t& err, neodevice_t& forDevice) : ICommunication(err), device(forDevice) {
overlappedRead.hEvent = INVALID_HANDLE_VALUE;
overlappedWrite.hEvent = INVALID_HANDLE_VALUE;
overlappedWait.hEvent = INVALID_HANDLE_VALUE;

View File

@ -39,7 +39,7 @@ std::vector<neodevice_t> FTDI::FindByProduct(int product) {
return found;
}
FTDI::FTDI(device_errorhandler_t err, neodevice_t& forDevice) : device(forDevice), err(err) {
FTDI::FTDI(const device_errorhandler_t& err, neodevice_t& forDevice) : ICommunication(err), device(forDevice) {
openable = strlen(forDevice.serial) > 0 && device.handle >= 0 && device.handle < (neodevice_handle_t)handles.size();
}
@ -183,5 +183,6 @@ void FTDI::writeTask() {
continue;
ftdi.write(writeOp.bytes.data(), (int)writeOp.bytes.size());
onWrite();
}
}

View File

@ -285,5 +285,6 @@ void STM32::writeTask() {
ssize_t actualWritten = ::write(fd, writeOp.bytes.data(), writeSize);
if(actualWritten != writeSize)
err(APIError::FailedToWrite);
onWrite();
}
}

View File

@ -177,7 +177,7 @@ bool PCAP::IsHandleValid(neodevice_handle_t handle) {
return (netifIndex < knownInterfaces.size());
}
PCAP::PCAP(device_errorhandler_t err, neodevice_t& forDevice) : device(forDevice), err(err) {
PCAP::PCAP(const device_errorhandler_t& err, neodevice_t& forDevice) : ICommunication(err), device(forDevice) {
if(IsHandleValid(device.handle)) {
interface = knownInterfaces[(device.handle >> 24) & 0xFF];
interface.fp = nullptr; // We're going to open our own connection to the interface. This should already be nullptr but just in case.
@ -287,6 +287,7 @@ void PCAP::writeTask() {
auto bs = sendPacket.getBytestream();
if(!closing)
pcap.sendpacket(interface.fp, bs.data(), (int)bs.size());
onWrite();
// TODO Handle packet send errors
}
}

View File

@ -389,6 +389,8 @@ void VCP::writeTask() {
bytesWritten = 0;
if(WriteFile(handle, writeOp.bytes.data(), (DWORD)writeOp.bytes.size(), nullptr, &overlappedWrite))
continue;
onWrite();
auto winerr = GetLastError();
if(winerr == ERROR_IO_PENDING) {