Driver: Block between read attempts

Driver:

* Refactored to limit accessibility of member fields;

Communication:

* readTask() now calls for a blocking wait;
ks-refactor-docs
Kurt Wachowski 2024-08-13 13:55:12 +00:00 committed by Kyle Schwarz
parent f25a0a4a81
commit 75af3220b0
20 changed files with 116 additions and 104 deletions

View File

@ -276,7 +276,7 @@ void Communication::readTask() {
std::unique_lock<std::mutex> lk(pauseReadTaskMutex);
pauseReadTaskCv.wait(lk, [this]() { return !pauseReadTask; });
}
if(driver->readAvailable()) {
if(driver->waitForRx(readTaskWakeLimit, readTaskWakeTimeout)) {
if(pauseReadTask) {
/**
* Reads could have paused while the driver was not available

View File

@ -11,13 +11,21 @@ using namespace icsneo;
bool Driver::pushRx(const uint8_t* buf, size_t numReceived) {
bool ret = readBuffer.write(buf, numReceived);
if(hasRxWaitRequest) {
rxWaitRequestCv.notify_one();
}
rxWaitCv.notify_all();
return ret;
}
void Driver::clearBuffers()
{
WriteOperation flushop;
readBuffer.clear();
rxWaitCv.notify_all();
while (writeQueue.try_dequeue(flushop)) {}
}
bool Driver::waitForRx(size_t limit, std::chrono::milliseconds timeout) {
return waitForRx([limit, this]() {
return readBuffer.size() >= limit;
@ -26,13 +34,7 @@ bool Driver::waitForRx(size_t limit, std::chrono::milliseconds timeout) {
bool Driver::waitForRx(std::function<bool()> predicate, std::chrono::milliseconds timeout) {
std::unique_lock<std::mutex> lk(rxWaitMutex);
hasRxWaitRequest = true;
auto ret = rxWaitRequestCv.wait_for(lk, timeout, predicate);
hasRxWaitRequest = false;
return ret;
return rxWaitCv.wait_for(lk, timeout, predicate);
}
bool Driver::readWait(std::vector<uint8_t>& bytes, std::chrono::milliseconds timeout, size_t limit) {
@ -92,4 +94,4 @@ bool Driver::write(const std::vector<uint8_t>& bytes) {
report(APIEvent::Type::Unknown, APIEvent::Severity::Error);
return ret;
}
}

View File

@ -87,6 +87,9 @@ public:
std::unique_ptr<Driver> driver;
device_eventhandler_t report;
size_t readTaskWakeLimit = 1;
std::chrono::milliseconds readTaskWakeTimeout = std::chrono::milliseconds(1000);
protected:
static int messageCallbackIDCounter;
std::mutex messageCallbacksLock;

View File

@ -24,9 +24,11 @@ public:
virtual bool isOpen() = 0;
virtual void modeChangeIncoming() {}
virtual void awaitModeChangeComplete() {}
virtual bool isDisconnected() { return disconnected; };
virtual bool close() = 0;
inline bool isDisconnected() const { return disconnected; };
inline bool isClosing() const { return closing; }
bool waitForRx(size_t limit, std::chrono::milliseconds timeout);
bool waitForRx(std::function<bool()> predicate, std::chrono::milliseconds timeout);
bool readWait(std::vector<uint8_t>& bytes, std::chrono::milliseconds timeout = std::chrono::milliseconds(100), size_t limit = 0);
@ -52,8 +54,8 @@ protected:
WAIT
};
virtual void readTask() = 0;
virtual void writeTask() = 0;
inline void setIsClosing(bool isClosing) { closing = isClosing; }
inline void setIsDisconnected(bool isDisconnected) { disconnected = isDisconnected; }
// Overridable in case the driver doesn't want to use writeTask and writeQueue
virtual bool writeQueueFull() { return writeQueue.size_approx() > writeQueueSize; }
@ -61,13 +63,15 @@ protected:
virtual bool writeInternal(const std::vector<uint8_t>& b) { return writeQueue.enqueue(WriteOperation(b)); }
bool pushRx(const uint8_t* buf, size_t numReceived);
RingBuffer readBuffer = RingBuffer(ICSNEO_DRIVER_RINGBUFFER_SIZE);
std::atomic<bool> hasRxWaitRequest = false;
std::condition_variable rxWaitRequestCv;
std::mutex rxWaitMutex;
void clearBuffers();
moodycamel::BlockingConcurrentQueue<WriteOperation> writeQueue;
std::thread readThread, writeThread;
private:
RingBuffer readBuffer = RingBuffer(ICSNEO_DRIVER_RINGBUFFER_SIZE);
std::condition_variable rxWaitCv;
std::mutex rxWaitMutex;
std::atomic<bool> closing{false};
std::atomic<bool> disconnected{false};
};

View File

@ -22,8 +22,10 @@ public:
private:
neodevice_t& device;
std::optional<void*> handle;
void readTask() override;
void writeTask() override;
std::thread readThread, writeThread;
void readTask();
void writeTask();
};
}

View File

@ -47,8 +47,10 @@ private:
static std::string HandleToTTY(neodevice_handle_t handle);
void readTask() override;
void writeTask() override;
std::thread readThread, writeThread;
void readTask();
void writeTask();
bool fdIsValid();
};

View File

@ -25,8 +25,11 @@ public:
bool isOpen() override;
bool close() override;
private:
void readTask() override;
void writeTask() override;
std::thread readThread, writeThread;
void readTask();
void writeTask();
bool writeQueueFull() override;
bool writeQueueAlmostFull() override;
bool writeInternal(const std::vector<uint8_t>& bytes) override;

View File

@ -57,8 +57,11 @@ private:
static std::vector<std::string> handles;
static bool ErrorIsDisconnection(int errorCode);
std::thread readThread, writeThread;
void readTask();
void writeTask();
bool openable; // Set to false in the constructor if the object has not been found in searchResultDevices
neodevice_t& device;

View File

@ -30,8 +30,10 @@ private:
uint8_t deviceMAC[6];
bool openable = true;
EthernetPacketizer ethPacketizer;
void readTask() override;
void writeTask() override;
std::thread readThread, writeThread;
void readTask();
void writeTask();
class NetworkInterface {
public:

View File

@ -47,8 +47,10 @@ private:
uint32_t dstIP;
uint16_t dstPort;
std::unique_ptr<Socket> socket;
void readTask() override;
void writeTask() override;
std::thread readThread, writeThread;
void readTask();
void writeTask();
};
}

View File

@ -32,6 +32,7 @@ private:
bool openable = true;
EthernetPacketizer ethPacketizer;
std::thread readThread, writeThread;
std::thread transmitThread;
pcap_send_queue* transmitQueue = nullptr;
std::condition_variable transmitQueueCV;

View File

@ -37,6 +37,7 @@ private:
std::shared_ptr<Detail> detail;
std::vector<std::shared_ptr<std::thread>> threads;
std::thread readThread, writeThread;
void readTask();
void writeTask();
};

View File

@ -64,7 +64,7 @@ bool FTD3XX::open() {
}
handle.emplace(tmpHandle);
closing = false;
setIsClosing(false);
readThread = std::thread(&FTD3XX::readTask, this);
writeThread = std::thread(&FTD3XX::writeTask, this);
@ -81,23 +81,21 @@ bool FTD3XX::close() {
return false;
}
closing = true;
disconnected = false;
setIsClosing(true);
setIsDisconnected(false);
if(readThread.joinable())
readThread.join();
if(writeThread.joinable())
writeThread.join();
WriteOperation flushop;
readBuffer.pop(readBuffer.size());
while(writeQueue.try_dequeue(flushop)) {}
clearBuffers();
if(const auto ret = FT_Close(*handle); ret != FT_OK) {
addEvent(ret, APIEvent::Severity::EventWarning);
}
closing = false;
setIsClosing(false);
return true;
}
@ -110,7 +108,7 @@ void FTD3XX::readTask() {
FT_SetStreamPipe(*handle, false, false, READ_PIPE_ID, bufferSize);
FT_SetPipeTimeout(*handle, READ_PIPE_ID, 1);
while(!closing && !isDisconnected()) {
while(!isClosing() && !isDisconnected()) {
ULONG received = 0;
OVERLAPPED overlap = {};
FT_InitializeOverlapped(*handle, &overlap);
@ -119,13 +117,13 @@ void FTD3XX::readTask() {
#else
FT_ReadPipeAsync(*handle, 0, buffer, bufferSize, &received, &overlap);
#endif
while(!closing) {
while(!isClosing()) {
const auto ret = FT_GetOverlappedResult(*handle, &overlap, &received, true);
if(ret == FT_IO_PENDING)
continue;
if(ret != FT_OK) {
if(ret == FT_IO_ERROR) {
disconnected = true;
setIsDisconnected(true);
report(APIEvent::Type::DeviceDisconnected, APIEvent::Severity::Error);
} else {
addEvent(ret, APIEvent::Severity::Error);
@ -146,7 +144,7 @@ void FTD3XX::writeTask() {
FT_SetPipeTimeout(*handle, WRITE_PIPE_ID, 100);
WriteOperation writeOp;
while(!closing && !isDisconnected()) {
while(!isClosing() && !isDisconnected()) {
if(!writeQueue.wait_dequeue_timed(writeOp, std::chrono::milliseconds(100)))
continue;
@ -160,13 +158,13 @@ void FTD3XX::writeTask() {
#else
FT_WritePipeAsync(*handle, 0, writeOp.bytes.data(), size, &sent, &overlap);
#endif
while(!closing) {
while(!isClosing()) {
const auto ret = FT_GetOverlappedResult(*handle, &overlap, &sent, true);
if(ret == FT_IO_PENDING)
continue;
if(ret != FT_OK) {
if(ret == FT_IO_ERROR) {
disconnected = true;
setIsDisconnected(true);
report(APIEvent::Type::DeviceDisconnected, APIEvent::Severity::Error);
} else {
addEvent(ret, APIEvent::Severity::Error);

View File

@ -117,7 +117,7 @@ bool CDCACM::close() {
return false;
}
closing = true;
setIsClosing(true);
if(readThread.joinable())
readThread.join();
@ -125,8 +125,8 @@ bool CDCACM::close() {
if(writeThread.joinable())
writeThread.join();
closing = false;
disconnected = false;
setIsClosing(false);
setIsDisconnected(false);
if(modeChanging) {
// We're expecting this inode to go away after we close the device
@ -140,9 +140,7 @@ bool CDCACM::close() {
int ret = ::close(fd);
fd = -1;
WriteOperation flushop;
readBuffer.clear();
while (writeQueue.try_dequeue(flushop)) {}
clearBuffers();
if(modeChanging) {
modeChanging = false;
@ -173,7 +171,7 @@ void CDCACM::readTask() {
constexpr size_t READ_BUFFER_SIZE = 2048;
uint8_t readbuf[READ_BUFFER_SIZE];
EventManager::GetInstance().downgradeErrorsOnCurrentThread();
while(!closing && !isDisconnected()) {
while(!isClosing() && !isDisconnected()) {
fd_set rfds = {0};
struct timeval tv = {0};
FD_SET(fd, &rfds);
@ -199,8 +197,8 @@ void CDCACM::readTask() {
// Requesting thread is responsible for calling close. This allows for more flexibility
});
break;
} else if(!closing && !fdIsValid() && !isDisconnected()) {
disconnected = true;
} else if(!isClosing() && !fdIsValid() && !isDisconnected()) {
setIsDisconnected(true);
report(APIEvent::Type::DeviceDisconnected, APIEvent::Severity::Error);
}
}
@ -210,7 +208,7 @@ void CDCACM::readTask() {
void CDCACM::writeTask() {
WriteOperation writeOp;
EventManager::GetInstance().downgradeErrorsOnCurrentThread();
while(!closing && !isDisconnected()) {
while(!isClosing() && !isDisconnected()) {
if(!writeQueue.wait_dequeue_timed(writeOp, std::chrono::milliseconds(100)))
continue;
@ -233,7 +231,7 @@ void CDCACM::writeTask() {
} else if (actualWritten < 0) {
if(!fdIsValid()) {
if(!isDisconnected()) {
disconnected = true;
setIsDisconnected(true);
report(APIEvent::Type::DeviceDisconnected, APIEvent::Severity::Error);
}
} else

View File

@ -166,13 +166,13 @@ bool FirmIO::close() {
return false;
}
closing = true;
setIsClosing(true);
if(readThread.joinable())
readThread.join();
closing = false;
disconnected = false;
setIsClosing(false);
setIsDisconnected(false);
int ret = 0;
if(vbase != nullptr) {
@ -202,7 +202,7 @@ void FirmIO::readTask() {
std::cerr << "FirmIO::readTask setpriority failed : " << strerror(errno) << std::endl;
}
while(!closing && !isDisconnected()) {
while(!isClosing() && !isDisconnected()) {
fd_set rfds = {0};
struct timeval tv = {0};
FD_SET(fd, &rfds);
@ -244,7 +244,7 @@ void FirmIO::readTask() {
uint8_t* addr = reinterpret_cast<uint8_t*>(msg.payload.data.addr - PHY_ADDR_BASE + vbase);
while (!pushRx(addr, msg.payload.data.len)) {
std::this_thread::sleep_for(std::chrono::milliseconds(1)); // back-off so reading thread can empty the buffer
if (closing || isDisconnected()) {
if (isClosing() || isDisconnected()) {
break;
}
}

View File

@ -81,7 +81,7 @@ bool FTDI::open() {
ftdi.flush();
// Create threads
closing = false;
setIsClosing(false);
readThread = std::thread(&FTDI::readTask, this);
writeThread = std::thread(&FTDI::writeTask, this);
@ -94,7 +94,7 @@ bool FTDI::close() {
return false;
}
closing = true;
setIsClosing(true);
if(readThread.joinable())
readThread.join();
@ -109,12 +109,10 @@ bool FTDI::close() {
report(APIEvent::Type::DriverFailedToClose, APIEvent::Severity::Error);
}
WriteOperation flushop;
readBuffer.clear();
while(writeQueue.try_dequeue(flushop)) {}
clearBuffers();
closing = false;
disconnected = false;
setIsClosing(false);
setIsDisconnected(false);
return ret;
}
@ -202,12 +200,12 @@ void FTDI::readTask() {
constexpr size_t READ_BUFFER_SIZE = 8;
uint8_t readbuf[READ_BUFFER_SIZE];
EventManager::GetInstance().downgradeErrorsOnCurrentThread();
while(!closing && !isDisconnected()) {
while(!isClosing() && !isDisconnected()) {
auto readBytes = ftdi.read(readbuf, READ_BUFFER_SIZE);
if(readBytes < 0) {
if(ErrorIsDisconnection(readBytes)) {
if(!isDisconnected()) {
disconnected = true;
setIsDisconnected(true);
report(APIEvent::Type::DeviceDisconnected, APIEvent::Severity::Error);
}
} else
@ -220,7 +218,7 @@ void FTDI::readTask() {
void FTDI::writeTask() {
WriteOperation writeOp;
EventManager::GetInstance().downgradeErrorsOnCurrentThread();
while(!closing && !isDisconnected()) {
while(!isClosing() && !isDisconnected()) {
if(!writeQueue.wait_dequeue_timed(writeOp, std::chrono::milliseconds(100)))
continue;
@ -230,7 +228,7 @@ void FTDI::writeTask() {
if(writeBytes < 0) {
if(ErrorIsDisconnection(writeBytes)) {
if(!isDisconnected()) {
disconnected = true;
setIsDisconnected(true);
report(APIEvent::Type::DeviceDisconnected, APIEvent::Severity::Error);
}
break;

View File

@ -258,28 +258,26 @@ bool PCAP::close() {
if(!isOpen())
return false;
closing = true; // Signal the threads that we are closing
setIsClosing(true); // Signal the threads that we are closing
pcap_breakloop(iface.fp);
#ifndef __linux__
pthread_cancel(readThread.native_handle());
#endif
readThread.join();
writeThread.join();
closing = false;
setIsClosing(false);
pcap_close(iface.fp);
iface.fp = nullptr;
WriteOperation flushop;
readBuffer.clear();
while(writeQueue.try_dequeue(flushop)) {}
clearBuffers();
return true;
}
void PCAP::readTask() {
EventManager::GetInstance().downgradeErrorsOnCurrentThread();
while (!closing) {
while (!isClosing()) {
pcap_dispatch(iface.fp, -1, [](uint8_t* obj, const struct pcap_pkthdr* header, const uint8_t* data) {
PCAP* driver = reinterpret_cast<PCAP*>(obj);
if(driver->ethPacketizer.inputUp({data, data + header->caplen})) {
@ -294,7 +292,7 @@ void PCAP::writeTask() {
WriteOperation writeOp;
EventManager::GetInstance().downgradeErrorsOnCurrentThread();
while(!closing) {
while(!isClosing()) {
if(!writeQueue.wait_dequeue_timed(writeOp, std::chrono::milliseconds(100)))
continue;

View File

@ -511,20 +511,18 @@ bool TCP::close() {
return false;
}
closing = true;
disconnected = false;
setIsClosing(true);
setIsDisconnected(false);
if(readThread.joinable())
readThread.join();
if(writeThread.joinable())
writeThread.join();
WriteOperation flushop;
readBuffer.pop(readBuffer.size());
while(writeQueue.try_dequeue(flushop)) {}
clearBuffers();
socket.reset();
closing = false;
setIsClosing(false);
return true;
}
@ -534,7 +532,7 @@ void TCP::readTask() {
constexpr size_t READ_BUFFER_SIZE = 2048;
uint8_t readbuf[READ_BUFFER_SIZE];
while(!closing) {
while(!isClosing()) {
if(const auto received = ::recv(*socket, (char*)readbuf, READ_BUFFER_SIZE, 0); received > 0) {
pushRx(readbuf, received);
} else {
@ -547,11 +545,11 @@ void TCP::writeTask() {
EventManager::GetInstance().downgradeErrorsOnCurrentThread();
WriteOperation writeOp;
while(!closing) {
while(!isClosing()) {
if(!writeQueue.wait_dequeue_timed(writeOp, std::chrono::milliseconds(100)))
continue;
while(!closing) {
while(!isClosing()) {
if(::send(*socket, (char*)writeOp.bytes.data(), WIN_INT(writeOp.bytes.size()), 0) > 0)
break;
socket->poll(POLLOUT, 100);

View File

@ -246,18 +246,17 @@ bool PCAP::close() {
return false;
}
closing = true; // Signal the threads that we are closing
setIsClosing(true); // Signal the threads that we are closing
readThread.join();
writeThread.join();
transmitThread.join();
closing = false;
setIsClosing(false);
pcap.close(iface.fp);
iface.fp = nullptr;
WriteOperation flushop;
readBuffer.clear();
while(writeQueue.try_dequeue(flushop)) {}
clearBuffers();
transmitQueue = nullptr;
return true;
@ -267,7 +266,7 @@ void PCAP::readTask() {
struct pcap_pkthdr* header;
const uint8_t* data;
EventManager::GetInstance().downgradeErrorsOnCurrentThread();
while(!closing) {
while(!isClosing()) {
auto readBytes = pcap.next_ex(iface.fp, &header, &data);
if(readBytes < 0) {
report(APIEvent::Type::FailedToRead, APIEvent::Severity::Error);
@ -291,7 +290,7 @@ void PCAP::writeTask() {
pcap_send_queue* queue2 = pcap.sendqueue_alloc(128000);
pcap_send_queue* queue = queue1;
while(!closing) {
while(!isClosing()) {
// Potentially, we added frames to a second queue faster than the other thread was able to hand the first
// off to the kernel. In that case, wait for a minimal amount of time before checking whether we can
// transmit it again.
@ -342,9 +341,9 @@ void PCAP::writeTask() {
}
void PCAP::transmitTask() {
while(!closing) {
while(!isClosing()) {
std::unique_lock<std::mutex> lk(transmitQueueMutex);
if(transmitQueueCV.wait_for(lk, std::chrono::milliseconds(100), [this] { return !!transmitQueue; }) && !closing && transmitQueue) {
if(transmitQueueCV.wait_for(lk, std::chrono::milliseconds(100), [this] { return !!transmitQueue; }) && !isClosing() && transmitQueue) {
pcap_send_queue* current = transmitQueue;
lk.unlock();
pcap.sendqueue_transmit(iface.fp, current, 0);

View File

@ -326,12 +326,12 @@ bool VCP::close() {
return false;
}
closing = true; // Signal the threads that we are closing
setIsClosing(true); // Signal the threads that we are closing
for(auto& t : threads)
t->join(); // Wait for the threads to close
readThread.join();
writeThread.join();
closing = false;
setIsClosing(false);
if(!CloseHandle(detail->handle)) {
report(APIEvent::Type::DriverFailedToClose, APIEvent::Severity::Error);
@ -357,9 +357,7 @@ bool VCP::close() {
detail->overlappedWait.hEvent = INVALID_HANDLE_VALUE;
}
WriteOperation flushop;
readBuffer.clear();
while(writeQueue.try_dequeue(flushop)) {}
clearBuffers();
if(!ret)
report(APIEvent::Type::DriverFailedToClose, APIEvent::Severity::Error);
@ -379,7 +377,7 @@ void VCP::readTask() {
IOTaskState state = LAUNCH;
DWORD bytesRead = 0;
EventManager::GetInstance().downgradeErrorsOnCurrentThread();
while(!closing && !isDisconnected()) {
while(!isClosing() && !isDisconnected()) {
switch(state) {
case LAUNCH: {
COMSTAT comStatus;
@ -401,7 +399,7 @@ void VCP::readTask() {
else if(lastError != ERROR_SUCCESS) {
if(lastError == ERROR_ACCESS_DENIED) {
if(!isDisconnected()) {
disconnected = true;
setIsDisconnected(true);
report(APIEvent::Type::DeviceDisconnected, APIEvent::Severity::Error);
}
} else
@ -432,7 +430,7 @@ void VCP::writeTask() {
VCP::WriteOperation writeOp;
DWORD bytesWritten = 0;
EventManager::GetInstance().downgradeErrorsOnCurrentThread();
while(!closing && !isDisconnected()) {
while(!isClosing() && !isDisconnected()) {
switch(state) {
case LAUNCH: {
if(!writeQueue.wait_dequeue_timed(writeOp, std::chrono::milliseconds(100)))
@ -448,7 +446,7 @@ void VCP::writeTask() {
}
else if(winerr == ERROR_ACCESS_DENIED) {
if(!isDisconnected()) {
disconnected = true;
setIsDisconnected(true);
report(APIEvent::Type::DeviceDisconnected, APIEvent::Severity::Error);
}
} else