From 3f5150bef3cd23989f28b6e167d31719d95c6947 Mon Sep 17 00:00:00 2001 From: Jonathan Schwartz Date: Wed, 7 Jan 2026 18:35:23 +0000 Subject: [PATCH] FirmIO: Fix instability and memory leak issues --- include/icsneo/platform/posix/firmio.h | 22 ++- platform/posix/firmio.cpp | 192 ++++++++++++++++--------- 2 files changed, 141 insertions(+), 73 deletions(-) diff --git a/include/icsneo/platform/posix/firmio.h b/include/icsneo/platform/posix/firmio.h index 82e8522..8396c92 100644 --- a/include/icsneo/platform/posix/firmio.h +++ b/include/icsneo/platform/posix/firmio.h @@ -19,6 +19,9 @@ class FirmIO : public Driver { public: static void Find(std::vector& foundDevices); + FirmIO(const device_eventhandler_t& report) : Driver(report) { + writeQueueSize = 256; + } using Driver::Driver; // Inherit constructor ~FirmIO(); bool open() override; @@ -26,16 +29,16 @@ public: bool close() override; driver_finder_t getFinder() override { return FirmIO::Find; } + // bool writeQueueFull() override; + // bool writeQueueAlmostFull() override; + bool writeInternal(const std::vector& b) override; + private: std::thread readThread, writeThread; void readTask(); void writeTask(); - bool writeQueueFull() override; - bool writeQueueAlmostFull() override; - bool writeInternal(const std::vector& bytes) override; - struct DataInfo { uint32_t type; uint32_t offset; @@ -111,7 +114,11 @@ private: bool free(uint8_t* addr); PhysicalAddress translate(uint8_t* addr) const; - private: + uint32_t getUsedBlocks() const { return usedBlocks; } + size_t getTotalBlocks() const { return blocks.size(); } + bool isFull() const { return usedBlocks == blocks.size(); } + + struct BlockInfo { enum class Status : uint32_t { Free = 0, @@ -121,6 +128,7 @@ private: uint8_t* addr; }; + private: std::vector blocks; std::atomic usedBlocks; @@ -137,6 +145,10 @@ private: std::mutex outMutex; std::optional out; std::optional outMemory; + + std::atomic num_read = 0; + std::atomic num_written = 0; + std::atomic num_freed = 0; }; } diff --git a/platform/posix/firmio.cpp b/platform/posix/firmio.cpp index 6752358..d756472 100644 --- a/platform/posix/firmio.cpp +++ b/platform/posix/firmio.cpp @@ -42,16 +42,16 @@ void FirmIO::Find(std::vector& found) { Packetizer packetizer([](APIEvent::Type, APIEvent::Severity) {}); Decoder decoder([](APIEvent::Type, APIEvent::Severity) {}); using namespace std::chrono; - const auto start = steady_clock::now(); // Get an absolute wall clock to compare to - const auto overallTimeout = start + milliseconds(500); - while(!temp.readAvailable()) { - if(steady_clock::now() > overallTimeout) { - // failed to read out a serial number reponse in time - break; - } + const auto overallTimeout = steady_clock::now() + milliseconds(200); + size_t lastBufferSize = 0; + while (steady_clock::now() < overallTimeout) + { + temp.waitForRx(lastBufferSize + 1, milliseconds(100)); + bool havePacket = packetizer.input(temp.getReadBuffer()); + lastBufferSize = temp.getReadBuffer().size(); - if(!packetizer.input(temp.getReadBuffer())) + if(!havePacket) continue; // A full packet has not yet been read out for(const auto& packet : packetizer.output()) { @@ -75,6 +75,7 @@ void FirmIO::Find(std::vector& found) { }; found.push_back(foundDevice); + break; // never going to find two! } } } @@ -141,17 +142,27 @@ bool FirmIO::open() { } } - //std::cout << "Flushed " << std::dec << i << " freeing " << toFree.size() << std::endl; + // std::cout << "Flushed " << std::dec << i << " freeing " << toFree.size() << std::endl; - while(!toFree.empty()) { - std::lock_guard lk(outMutex); - out->write(&toFree.back()); + auto endTime = std::chrono::steady_clock::now() + std::chrono::milliseconds(100); + while(std::chrono::steady_clock::now() < endTime && !toFree.empty()) { + bool pass = false; + { + std::scoped_lock lk(outMutex); + pass = out->write(&toFree.back()); + } + if (!pass) + { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + continue; + } toFree.pop_back(); } - // Create thread - // No thread for writing since we don't need the extra buffer + // Create threads readThread = std::thread(&FirmIO::readTask, this); + //logThread = std::thread(&FirmIO::logTask, this); + writeThread = std::thread(&FirmIO::writeTask, this); return true; } @@ -171,6 +182,13 @@ bool FirmIO::close() { if(readThread.joinable()) readThread.join(); + if (writeThread.joinable()) + writeThread.join(); + + // if(logThread.joinable()) + // logThread.join(); + + setIsClosing(false); setIsDisconnected(false); @@ -194,7 +212,8 @@ bool FirmIO::close() { void FirmIO::readTask() { EventManager::GetInstance().downgradeErrorsOnCurrentThread(); Msg msg; - std::vector toFree; + std::vector toFree; + toFree.reserve(outMemory->getTotalBlocks()); // attempt to elevate the thread priority. PRIO_MIN is actually the highest priority but the lowest value. int err = setpriority(PRIO_PROCESS, 0, -1); @@ -208,7 +227,6 @@ void FirmIO::readTask() { FD_SET(fd, &rfds); tv.tv_usec = 50000; // 50ms int ret = ::select(fd + 1, &rfds, NULL, NULL, &tv); - // std::cout << "select returned " << ret << ' ' << errno << std::endl; if(ret < 0) report(APIEvent::Type::FailedToRead, APIEvent::Severity::Error); if(ret <= 0) @@ -221,24 +239,12 @@ void FirmIO::readTask() { if(ret < int(sizeof(interruptCount)) || interruptCount < 1) continue; - toFree.clear(); - int i = 0; - while(in->read(&msg) && i++ < 1000) { + while(in->read(&msg)) { switch(msg.command) { case Msg::Command::ComData: { - if(toFree.empty() || toFree.back().payload.free.refCount == 6) { - toFree.emplace_back(); - toFree.back().command = Msg::Command::ComFree; - toFree.back().payload.free.refCount = 0; - } - // Add this ref to the list of payloads to free - // After we process these, we'll send this list back to the device - // so that it can free these entries - toFree.back().payload.free.ref[toFree.back().payload.free.refCount] = msg.payload.data.ref; - toFree.back().payload.free.refCount++; - - // std::cout << "Got some data @ 0x" << std::hex << msg.payload.data.addr << " " << std::dec << msg.payload.data.len << std::endl; + toFree.push_back(msg.payload.data.ref); + ++num_read; // Translate the physical address back to our virtual address space uint8_t* addr = reinterpret_cast(msg.payload.data.addr - PHY_ADDR_BASE + vbase); @@ -251,58 +257,95 @@ void FirmIO::readTask() { } break; case Msg::Command::ComFree: { - std::lock_guard lk(outMutex); - // std::cout << "Got some free " << std::hex << msg.payload.free.ref[0] << std::endl; + std::scoped_lock lk(outMutex); for(uint32_t i = 0; i < msg.payload.free.refCount; i++) outMemory->free(reinterpret_cast(msg.payload.free.ref[i])); break; } + default: + // std::cout << "invalid command: " << std::hex << static_cast(msg.command) << std::dec << std::endl; + break; } + if (isClosing() || isDisconnected()) + break; } - - while(!toFree.empty()) { - std::lock_guard lk(outMutex); - out->write(&toFree.back()); - toFree.pop_back(); + while (toFree.size()) { + Msg freeMsg = { Msg::Command::ComFree }; + freeMsg.payload.free.refCount = std::min(static_cast(toFree.size()), 6u); + for (size_t i = 0; i < freeMsg.payload.free.refCount; ++i) { + freeMsg.payload.free.ref[i] = toFree[i]; + } + std::scoped_lock lk(outMutex); + if (!out->write(&freeMsg)) { + break; + } + num_freed += freeMsg.payload.free.refCount; + toFree.erase(toFree.begin(), toFree.begin() + freeMsg.payload.free.refCount); } } + while (toFree.size()) + { + Msg freeMsg = { Msg::Command::ComFree }; + freeMsg.payload.free.refCount = std::min(static_cast(toFree.size()), 6u); + for (size_t i = 0; i < freeMsg.payload.free.refCount; ++i) { + freeMsg.payload.free.ref[i] = toFree[i]; + } + std::scoped_lock lk(outMutex); + if (!out->write(&freeMsg)) { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + continue; + } + toFree.erase(toFree.begin(), toFree.begin() + freeMsg.payload.free.refCount); + } + // std::cout << "FirmIO readTask exiting: " << "closing=" << isClosing() << " disconnected=" << isDisconnected() << std::endl; } void FirmIO::writeTask() { - return; // We're overriding Driver::writeInternal() and doing the work there -} + constexpr uint32_t genInterrupt = 0x01; + std::pair, uint8_t*> op; + while (!isClosing() && !isDisconnected()) { + if (!op.first) { + writeQueue.wait_dequeue_timed(op.first, std::chrono::milliseconds(100)); + continue; + } -bool FirmIO::writeQueueFull() { - return out->isFull(); -} + if (!op.second) { + op.second = outMemory->alloc(static_cast(op.first->bytes.size())); + if (op.second == nullptr) { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + continue; + } + memcpy(op.second, op.first->bytes.data(), op.first->bytes.size()); + } -bool FirmIO::writeQueueAlmostFull() { - // TODO: Better implementation here - return writeQueueFull(); + Msg msg = { Msg::Command::ComData }; + msg.payload.data.addr = outMemory->translate(op.second); + msg.payload.data.len = op.first->bytes.size(); + msg.payload.data.ref = reinterpret_cast(op.second); + + + std::scoped_lock lk(outMutex); + if(!out->write(&msg)) + { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + continue; + } + ++num_written; + ::write(fd, &genInterrupt, sizeof(genInterrupt)); + op.first.reset(); + op.second = nullptr; + } + std::cout << "FirmIO writeTask exiting: " << "closing=" << isClosing() << " disconnected=" << isDisconnected() << std::endl; } bool FirmIO::writeInternal(const std::vector& bytes) { if(bytes.empty() || bytes.size() > Mempool::BlockSize) + { + // std::cout << "Invalid write size of " << bytes.size() << std::endl; return false; - - std::lock_guard lk(outMutex); - uint8_t* sharedData = outMemory->alloc(bytes.size()); - if(sharedData == nullptr) - return false; - - // std::cout << "coping " << bytes.size() << " bytes of data" << std::endl; - memcpy(sharedData, bytes.data(), bytes.size()); - - Msg msg = { Msg::Command::ComData }; - msg.payload.data.addr = outMemory->translate(sharedData); - msg.payload.data.len = static_cast(bytes.size()); - msg.payload.data.ref = reinterpret_cast(sharedData); - - if(!out->write(&msg)) - return false; - - uint32_t genInterrupt = 0x01; - return ::write(fd, &genInterrupt, sizeof(genInterrupt)) == sizeof(genInterrupt); + } + + return writeQueue.enqueue(WriteOperation(bytes)); } bool FirmIO::MsgQueue::read(Msg* msg) { @@ -369,13 +412,17 @@ bool FirmIO::Mempool::free(uint8_t* addr) { return b.addr == addr; }); - if(found == blocks.end()) + if(found == blocks.end()) { + // std::cout << "failed to free block address " << std::hex << reinterpret_cast(addr) << std::dec << std::endl; return false; // Invalid address + } - if(found->status != BlockInfo::Status::Used) + if(found->status != BlockInfo::Status::Used) { + // std::cout << "invalid state for free of block address " << std::hex << reinterpret_cast(addr) << std::dec << std::endl; return false; // Double free + } - usedBlocks--; + --usedBlocks; found->status = BlockInfo::Status::Free; return true; } @@ -383,3 +430,12 @@ bool FirmIO::Mempool::free(uint8_t* addr) { FirmIO::Mempool::PhysicalAddress FirmIO::Mempool::translate(uint8_t* addr) const { return reinterpret_cast(addr - virtualAddress + physicalAddress); } + +// void FirmIO::logTask() +// { +// while (!isClosing() && !isDisconnected()) { +// std::cout << "FirmIO Stats: RX Count: " << num_read << " TX Count: " << num_written << " Used Blocks (out): " << outMemory->getUsedBlocks() << " Freed Blocks: " << num_freed << std::endl; +// std::this_thread::sleep_for(std::chrono::seconds(1)); +// } +// std::cout << "FirmIO logTask exiting: " << "closing=" << isClosing() << " disconnected=" << isDisconnected() << std::endl; +// }