FirmIO: Fix instability and memory leak issues
parent
2a2d55f20d
commit
3f5150bef3
|
|
@ -19,6 +19,9 @@ class FirmIO : public Driver {
|
||||||
public:
|
public:
|
||||||
static void Find(std::vector<FoundDevice>& foundDevices);
|
static void Find(std::vector<FoundDevice>& foundDevices);
|
||||||
|
|
||||||
|
FirmIO(const device_eventhandler_t& report) : Driver(report) {
|
||||||
|
writeQueueSize = 256;
|
||||||
|
}
|
||||||
using Driver::Driver; // Inherit constructor
|
using Driver::Driver; // Inherit constructor
|
||||||
~FirmIO();
|
~FirmIO();
|
||||||
bool open() override;
|
bool open() override;
|
||||||
|
|
@ -26,16 +29,16 @@ public:
|
||||||
bool close() override;
|
bool close() override;
|
||||||
driver_finder_t getFinder() override { return FirmIO::Find; }
|
driver_finder_t getFinder() override { return FirmIO::Find; }
|
||||||
|
|
||||||
|
// bool writeQueueFull() override;
|
||||||
|
// bool writeQueueAlmostFull() override;
|
||||||
|
bool writeInternal(const std::vector<uint8_t>& b) override;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
std::thread readThread, writeThread;
|
std::thread readThread, writeThread;
|
||||||
|
|
||||||
void readTask();
|
void readTask();
|
||||||
void writeTask();
|
void writeTask();
|
||||||
|
|
||||||
bool writeQueueFull() override;
|
|
||||||
bool writeQueueAlmostFull() override;
|
|
||||||
bool writeInternal(const std::vector<uint8_t>& bytes) override;
|
|
||||||
|
|
||||||
struct DataInfo {
|
struct DataInfo {
|
||||||
uint32_t type;
|
uint32_t type;
|
||||||
uint32_t offset;
|
uint32_t offset;
|
||||||
|
|
@ -111,7 +114,11 @@ private:
|
||||||
bool free(uint8_t* addr);
|
bool free(uint8_t* addr);
|
||||||
PhysicalAddress translate(uint8_t* addr) const;
|
PhysicalAddress translate(uint8_t* addr) const;
|
||||||
|
|
||||||
private:
|
uint32_t getUsedBlocks() const { return usedBlocks; }
|
||||||
|
size_t getTotalBlocks() const { return blocks.size(); }
|
||||||
|
bool isFull() const { return usedBlocks == blocks.size(); }
|
||||||
|
|
||||||
|
|
||||||
struct BlockInfo {
|
struct BlockInfo {
|
||||||
enum class Status : uint32_t {
|
enum class Status : uint32_t {
|
||||||
Free = 0,
|
Free = 0,
|
||||||
|
|
@ -121,6 +128,7 @@ private:
|
||||||
uint8_t* addr;
|
uint8_t* addr;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
private:
|
||||||
std::vector<BlockInfo> blocks;
|
std::vector<BlockInfo> blocks;
|
||||||
std::atomic<uint32_t> usedBlocks;
|
std::atomic<uint32_t> usedBlocks;
|
||||||
|
|
||||||
|
|
@ -137,6 +145,10 @@ private:
|
||||||
std::mutex outMutex;
|
std::mutex outMutex;
|
||||||
std::optional<MsgQueue> out;
|
std::optional<MsgQueue> out;
|
||||||
std::optional<Mempool> outMemory;
|
std::optional<Mempool> outMemory;
|
||||||
|
|
||||||
|
std::atomic<size_t> num_read = 0;
|
||||||
|
std::atomic<size_t> num_written = 0;
|
||||||
|
std::atomic<size_t> num_freed = 0;
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -42,16 +42,16 @@ void FirmIO::Find(std::vector<FoundDevice>& found) {
|
||||||
Packetizer packetizer([](APIEvent::Type, APIEvent::Severity) {});
|
Packetizer packetizer([](APIEvent::Type, APIEvent::Severity) {});
|
||||||
Decoder decoder([](APIEvent::Type, APIEvent::Severity) {});
|
Decoder decoder([](APIEvent::Type, APIEvent::Severity) {});
|
||||||
using namespace std::chrono;
|
using namespace std::chrono;
|
||||||
const auto start = steady_clock::now();
|
|
||||||
// Get an absolute wall clock to compare to
|
// Get an absolute wall clock to compare to
|
||||||
const auto overallTimeout = start + milliseconds(500);
|
const auto overallTimeout = steady_clock::now() + milliseconds(200);
|
||||||
while(!temp.readAvailable()) {
|
size_t lastBufferSize = 0;
|
||||||
if(steady_clock::now() > overallTimeout) {
|
while (steady_clock::now() < overallTimeout)
|
||||||
// failed to read out a serial number reponse in time
|
{
|
||||||
break;
|
temp.waitForRx(lastBufferSize + 1, milliseconds(100));
|
||||||
}
|
bool havePacket = packetizer.input(temp.getReadBuffer());
|
||||||
|
lastBufferSize = temp.getReadBuffer().size();
|
||||||
|
|
||||||
if(!packetizer.input(temp.getReadBuffer()))
|
if(!havePacket)
|
||||||
continue; // A full packet has not yet been read out
|
continue; // A full packet has not yet been read out
|
||||||
|
|
||||||
for(const auto& packet : packetizer.output()) {
|
for(const auto& packet : packetizer.output()) {
|
||||||
|
|
@ -75,6 +75,7 @@ void FirmIO::Find(std::vector<FoundDevice>& found) {
|
||||||
};
|
};
|
||||||
|
|
||||||
found.push_back(foundDevice);
|
found.push_back(foundDevice);
|
||||||
|
break; // never going to find two!
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -143,15 +144,25 @@ bool FirmIO::open() {
|
||||||
|
|
||||||
// std::cout << "Flushed " << std::dec << i << " freeing " << toFree.size() << std::endl;
|
// std::cout << "Flushed " << std::dec << i << " freeing " << toFree.size() << std::endl;
|
||||||
|
|
||||||
while(!toFree.empty()) {
|
auto endTime = std::chrono::steady_clock::now() + std::chrono::milliseconds(100);
|
||||||
std::lock_guard<std::mutex> lk(outMutex);
|
while(std::chrono::steady_clock::now() < endTime && !toFree.empty()) {
|
||||||
out->write(&toFree.back());
|
bool pass = false;
|
||||||
|
{
|
||||||
|
std::scoped_lock lk(outMutex);
|
||||||
|
pass = out->write(&toFree.back());
|
||||||
|
}
|
||||||
|
if (!pass)
|
||||||
|
{
|
||||||
|
std::this_thread::sleep_for(std::chrono::milliseconds(1));
|
||||||
|
continue;
|
||||||
|
}
|
||||||
toFree.pop_back();
|
toFree.pop_back();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create thread
|
// Create threads
|
||||||
// No thread for writing since we don't need the extra buffer
|
|
||||||
readThread = std::thread(&FirmIO::readTask, this);
|
readThread = std::thread(&FirmIO::readTask, this);
|
||||||
|
//logThread = std::thread(&FirmIO::logTask, this);
|
||||||
|
writeThread = std::thread(&FirmIO::writeTask, this);
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
@ -171,6 +182,13 @@ bool FirmIO::close() {
|
||||||
if(readThread.joinable())
|
if(readThread.joinable())
|
||||||
readThread.join();
|
readThread.join();
|
||||||
|
|
||||||
|
if (writeThread.joinable())
|
||||||
|
writeThread.join();
|
||||||
|
|
||||||
|
// if(logThread.joinable())
|
||||||
|
// logThread.join();
|
||||||
|
|
||||||
|
|
||||||
setIsClosing(false);
|
setIsClosing(false);
|
||||||
setIsDisconnected(false);
|
setIsDisconnected(false);
|
||||||
|
|
||||||
|
|
@ -194,7 +212,8 @@ bool FirmIO::close() {
|
||||||
void FirmIO::readTask() {
|
void FirmIO::readTask() {
|
||||||
EventManager::GetInstance().downgradeErrorsOnCurrentThread();
|
EventManager::GetInstance().downgradeErrorsOnCurrentThread();
|
||||||
Msg msg;
|
Msg msg;
|
||||||
std::vector<Msg> toFree;
|
std::vector<Msg::Ref> toFree;
|
||||||
|
toFree.reserve(outMemory->getTotalBlocks());
|
||||||
|
|
||||||
// attempt to elevate the thread priority. PRIO_MIN is actually the highest priority but the lowest value.
|
// attempt to elevate the thread priority. PRIO_MIN is actually the highest priority but the lowest value.
|
||||||
int err = setpriority(PRIO_PROCESS, 0, -1);
|
int err = setpriority(PRIO_PROCESS, 0, -1);
|
||||||
|
|
@ -208,7 +227,6 @@ void FirmIO::readTask() {
|
||||||
FD_SET(fd, &rfds);
|
FD_SET(fd, &rfds);
|
||||||
tv.tv_usec = 50000; // 50ms
|
tv.tv_usec = 50000; // 50ms
|
||||||
int ret = ::select(fd + 1, &rfds, NULL, NULL, &tv);
|
int ret = ::select(fd + 1, &rfds, NULL, NULL, &tv);
|
||||||
// std::cout << "select returned " << ret << ' ' << errno << std::endl;
|
|
||||||
if(ret < 0)
|
if(ret < 0)
|
||||||
report(APIEvent::Type::FailedToRead, APIEvent::Severity::Error);
|
report(APIEvent::Type::FailedToRead, APIEvent::Severity::Error);
|
||||||
if(ret <= 0)
|
if(ret <= 0)
|
||||||
|
|
@ -221,24 +239,12 @@ void FirmIO::readTask() {
|
||||||
if(ret < int(sizeof(interruptCount)) || interruptCount < 1)
|
if(ret < int(sizeof(interruptCount)) || interruptCount < 1)
|
||||||
continue;
|
continue;
|
||||||
|
|
||||||
toFree.clear();
|
while(in->read(&msg)) {
|
||||||
int i = 0;
|
|
||||||
while(in->read(&msg) && i++ < 1000) {
|
|
||||||
switch(msg.command) {
|
switch(msg.command) {
|
||||||
case Msg::Command::ComData: {
|
case Msg::Command::ComData: {
|
||||||
if(toFree.empty() || toFree.back().payload.free.refCount == 6) {
|
|
||||||
toFree.emplace_back();
|
|
||||||
toFree.back().command = Msg::Command::ComFree;
|
|
||||||
toFree.back().payload.free.refCount = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Add this ref to the list of payloads to free
|
toFree.push_back(msg.payload.data.ref);
|
||||||
// After we process these, we'll send this list back to the device
|
++num_read;
|
||||||
// so that it can free these entries
|
|
||||||
toFree.back().payload.free.ref[toFree.back().payload.free.refCount] = msg.payload.data.ref;
|
|
||||||
toFree.back().payload.free.refCount++;
|
|
||||||
|
|
||||||
// std::cout << "Got some data @ 0x" << std::hex << msg.payload.data.addr << " " << std::dec << msg.payload.data.len << std::endl;
|
|
||||||
|
|
||||||
// Translate the physical address back to our virtual address space
|
// Translate the physical address back to our virtual address space
|
||||||
uint8_t* addr = reinterpret_cast<uint8_t*>(msg.payload.data.addr - PHY_ADDR_BASE + vbase);
|
uint8_t* addr = reinterpret_cast<uint8_t*>(msg.payload.data.addr - PHY_ADDR_BASE + vbase);
|
||||||
|
|
@ -251,58 +257,95 @@ void FirmIO::readTask() {
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case Msg::Command::ComFree: {
|
case Msg::Command::ComFree: {
|
||||||
std::lock_guard<std::mutex> lk(outMutex);
|
std::scoped_lock lk(outMutex);
|
||||||
// std::cout << "Got some free " << std::hex << msg.payload.free.ref[0] << std::endl;
|
|
||||||
for(uint32_t i = 0; i < msg.payload.free.refCount; i++)
|
for(uint32_t i = 0; i < msg.payload.free.refCount; i++)
|
||||||
outMemory->free(reinterpret_cast<uint8_t*>(msg.payload.free.ref[i]));
|
outMemory->free(reinterpret_cast<uint8_t*>(msg.payload.free.ref[i]));
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
default:
|
||||||
|
// std::cout << "invalid command: " << std::hex << static_cast<uint32_t>(msg.command) << std::dec << std::endl;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if (isClosing() || isDisconnected())
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
while (toFree.size()) {
|
||||||
|
Msg freeMsg = { Msg::Command::ComFree };
|
||||||
|
freeMsg.payload.free.refCount = std::min(static_cast<uint32_t>(toFree.size()), 6u);
|
||||||
|
for (size_t i = 0; i < freeMsg.payload.free.refCount; ++i) {
|
||||||
|
freeMsg.payload.free.ref[i] = toFree[i];
|
||||||
|
}
|
||||||
|
std::scoped_lock lk(outMutex);
|
||||||
|
if (!out->write(&freeMsg)) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
num_freed += freeMsg.payload.free.refCount;
|
||||||
|
toFree.erase(toFree.begin(), toFree.begin() + freeMsg.payload.free.refCount);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
while (toFree.size())
|
||||||
while(!toFree.empty()) {
|
{
|
||||||
std::lock_guard<std::mutex> lk(outMutex);
|
Msg freeMsg = { Msg::Command::ComFree };
|
||||||
out->write(&toFree.back());
|
freeMsg.payload.free.refCount = std::min(static_cast<uint32_t>(toFree.size()), 6u);
|
||||||
toFree.pop_back();
|
for (size_t i = 0; i < freeMsg.payload.free.refCount; ++i) {
|
||||||
|
freeMsg.payload.free.ref[i] = toFree[i];
|
||||||
}
|
}
|
||||||
|
std::scoped_lock lk(outMutex);
|
||||||
|
if (!out->write(&freeMsg)) {
|
||||||
|
std::this_thread::sleep_for(std::chrono::milliseconds(1));
|
||||||
|
continue;
|
||||||
}
|
}
|
||||||
|
toFree.erase(toFree.begin(), toFree.begin() + freeMsg.payload.free.refCount);
|
||||||
|
}
|
||||||
|
// std::cout << "FirmIO readTask exiting: " << "closing=" << isClosing() << " disconnected=" << isDisconnected() << std::endl;
|
||||||
}
|
}
|
||||||
|
|
||||||
void FirmIO::writeTask() {
|
void FirmIO::writeTask() {
|
||||||
return; // We're overriding Driver::writeInternal() and doing the work there
|
constexpr uint32_t genInterrupt = 0x01;
|
||||||
|
std::pair<std::optional<WriteOperation>, uint8_t*> op;
|
||||||
|
while (!isClosing() && !isDisconnected()) {
|
||||||
|
if (!op.first) {
|
||||||
|
writeQueue.wait_dequeue_timed(op.first, std::chrono::milliseconds(100));
|
||||||
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool FirmIO::writeQueueFull() {
|
if (!op.second) {
|
||||||
return out->isFull();
|
op.second = outMemory->alloc(static_cast<uint32_t>(op.first->bytes.size()));
|
||||||
|
if (op.second == nullptr) {
|
||||||
|
std::this_thread::sleep_for(std::chrono::milliseconds(1));
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
memcpy(op.second, op.first->bytes.data(), op.first->bytes.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
bool FirmIO::writeQueueAlmostFull() {
|
Msg msg = { Msg::Command::ComData };
|
||||||
// TODO: Better implementation here
|
msg.payload.data.addr = outMemory->translate(op.second);
|
||||||
return writeQueueFull();
|
msg.payload.data.len = op.first->bytes.size();
|
||||||
|
msg.payload.data.ref = reinterpret_cast<Msg::Ref>(op.second);
|
||||||
|
|
||||||
|
|
||||||
|
std::scoped_lock lk(outMutex);
|
||||||
|
if(!out->write(&msg))
|
||||||
|
{
|
||||||
|
std::this_thread::sleep_for(std::chrono::milliseconds(1));
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
++num_written;
|
||||||
|
::write(fd, &genInterrupt, sizeof(genInterrupt));
|
||||||
|
op.first.reset();
|
||||||
|
op.second = nullptr;
|
||||||
|
}
|
||||||
|
std::cout << "FirmIO writeTask exiting: " << "closing=" << isClosing() << " disconnected=" << isDisconnected() << std::endl;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool FirmIO::writeInternal(const std::vector<uint8_t>& bytes) {
|
bool FirmIO::writeInternal(const std::vector<uint8_t>& bytes) {
|
||||||
if(bytes.empty() || bytes.size() > Mempool::BlockSize)
|
if(bytes.empty() || bytes.size() > Mempool::BlockSize)
|
||||||
|
{
|
||||||
|
// std::cout << "Invalid write size of " << bytes.size() << std::endl;
|
||||||
return false;
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
std::lock_guard<std::mutex> lk(outMutex);
|
return writeQueue.enqueue(WriteOperation(bytes));
|
||||||
uint8_t* sharedData = outMemory->alloc(bytes.size());
|
|
||||||
if(sharedData == nullptr)
|
|
||||||
return false;
|
|
||||||
|
|
||||||
// std::cout << "coping " << bytes.size() << " bytes of data" << std::endl;
|
|
||||||
memcpy(sharedData, bytes.data(), bytes.size());
|
|
||||||
|
|
||||||
Msg msg = { Msg::Command::ComData };
|
|
||||||
msg.payload.data.addr = outMemory->translate(sharedData);
|
|
||||||
msg.payload.data.len = static_cast<uint32_t>(bytes.size());
|
|
||||||
msg.payload.data.ref = reinterpret_cast<Msg::Ref>(sharedData);
|
|
||||||
|
|
||||||
if(!out->write(&msg))
|
|
||||||
return false;
|
|
||||||
|
|
||||||
uint32_t genInterrupt = 0x01;
|
|
||||||
return ::write(fd, &genInterrupt, sizeof(genInterrupt)) == sizeof(genInterrupt);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
bool FirmIO::MsgQueue::read(Msg* msg) {
|
bool FirmIO::MsgQueue::read(Msg* msg) {
|
||||||
|
|
@ -369,13 +412,17 @@ bool FirmIO::Mempool::free(uint8_t* addr) {
|
||||||
return b.addr == addr;
|
return b.addr == addr;
|
||||||
});
|
});
|
||||||
|
|
||||||
if(found == blocks.end())
|
if(found == blocks.end()) {
|
||||||
|
// std::cout << "failed to free block address " << std::hex << reinterpret_cast<uintptr_t>(addr) << std::dec << std::endl;
|
||||||
return false; // Invalid address
|
return false; // Invalid address
|
||||||
|
}
|
||||||
|
|
||||||
if(found->status != BlockInfo::Status::Used)
|
if(found->status != BlockInfo::Status::Used) {
|
||||||
|
// std::cout << "invalid state for free of block address " << std::hex << reinterpret_cast<uintptr_t>(addr) << std::dec << std::endl;
|
||||||
return false; // Double free
|
return false; // Double free
|
||||||
|
}
|
||||||
|
|
||||||
usedBlocks--;
|
--usedBlocks;
|
||||||
found->status = BlockInfo::Status::Free;
|
found->status = BlockInfo::Status::Free;
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
@ -383,3 +430,12 @@ bool FirmIO::Mempool::free(uint8_t* addr) {
|
||||||
FirmIO::Mempool::PhysicalAddress FirmIO::Mempool::translate(uint8_t* addr) const {
|
FirmIO::Mempool::PhysicalAddress FirmIO::Mempool::translate(uint8_t* addr) const {
|
||||||
return reinterpret_cast<PhysicalAddress>(addr - virtualAddress + physicalAddress);
|
return reinterpret_cast<PhysicalAddress>(addr - virtualAddress + physicalAddress);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// void FirmIO::logTask()
|
||||||
|
// {
|
||||||
|
// while (!isClosing() && !isDisconnected()) {
|
||||||
|
// std::cout << "FirmIO Stats: RX Count: " << num_read << " TX Count: " << num_written << " Used Blocks (out): " << outMemory->getUsedBlocks() << " Freed Blocks: " << num_freed << std::endl;
|
||||||
|
// std::this_thread::sleep_for(std::chrono::seconds(1));
|
||||||
|
// }
|
||||||
|
// std::cout << "FirmIO logTask exiting: " << "closing=" << isClosing() << " disconnected=" << isDisconnected() << std::endl;
|
||||||
|
// }
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue