FirmIO: Stable communication

v0.3.0-dev
Paul Hollinsky 2022-03-27 23:57:44 -04:00
parent 5d4ed0f4cd
commit d45d708446
2 changed files with 94 additions and 41 deletions

View File

@ -19,7 +19,7 @@ class FirmIO : public Driver {
public: public:
static void Find(std::vector<FoundDevice>& foundDevices); static void Find(std::vector<FoundDevice>& foundDevices);
FirmIO(device_eventhandler_t err); using Driver::Driver; // Inherit constructor
~FirmIO(); ~FirmIO();
bool open() override; bool open() override;
bool isOpen() override; bool isOpen() override;
@ -76,24 +76,30 @@ private:
class MsgQueue { // mq_t class MsgQueue { // mq_t
public: public:
bool read(Msg* msg) volatile; MsgQueue(void* infoPtr, void* msgsPtr)
bool write(const Msg* msg) volatile; : info(reinterpret_cast<MsgQueueInfo*>(infoPtr)), msgs(reinterpret_cast<Msg*>(msgsPtr)) {}
bool isEmpty() const volatile;
bool isFull() const volatile;
private: // These variables are mmaped, don't change their order or add anything bool read(Msg* msg);
uint32_t head; bool write(const Msg* msg);
uint32_t tail; bool isEmpty() const;
uint32_t size; bool isFull() const;
[[maybe_unused]] uint32_t reserved[4];
Msg* msgs; private:
struct MsgQueueInfo { // These variables are mmaped, don't change their order or add anything
uint32_t head;
uint32_t tail;
uint32_t size;
uint32_t reserved[4];
};
MsgQueueInfo* const info;
Msg* const msgs;
}; };
class Mempool { class Mempool {
public: public:
static constexpr const size_t BlockSize = 4096; static constexpr const size_t BlockSize = 4096;
Mempool(uint8_t* start, uint32_t size, void* virt, uint32_t phys); Mempool(uint8_t* start, uint32_t size, uint8_t* virt, uint32_t phys);
uint8_t* alloc(uint32_t size); uint8_t* alloc(uint32_t size);
bool free(uint8_t* addr); bool free(uint8_t* addr);
uint32_t translate(uint8_t* addr) const; uint32_t translate(uint8_t* addr) const;
@ -108,21 +114,21 @@ private:
uint8_t* addr; uint8_t* addr;
}; };
uint8_t* const startAddress;
const uint32_t totalSize;
std::vector<BlockInfo> blocks; std::vector<BlockInfo> blocks;
std::atomic<uint32_t> usedBlocks; std::atomic<uint32_t> usedBlocks;
void* const virtualAddress; uint8_t* const virtualAddress;
const uint32_t physicalAddress; const uint32_t physicalAddress;
}; };
int fd = 0; int fd = -1;
uint8_t* vbase = nullptr; uint8_t* vbase = nullptr;
volatile ComHeader* header = nullptr; volatile ComHeader* header = nullptr;
volatile MsgQueue* in = nullptr;
optional<MsgQueue> in;
std::mutex outMutex; std::mutex outMutex;
volatile MsgQueue* out = nullptr; optional<MsgQueue> out;
optional<Mempool> outMemory; optional<Mempool> outMemory;
}; };

View File

@ -105,10 +105,45 @@ bool FirmIO::open() {
} }
// Swapping the in and out ptrs here, what the device considers out, we consider in // Swapping the in and out ptrs here, what the device considers out, we consider in
out = reinterpret_cast<MsgQueue*>(header->msgqPtrIn.offset + vbase); out.emplace(header->msgqPtrIn.offset + vbase, header->msgqIn.offset + vbase);
in = reinterpret_cast<MsgQueue*>(header->msgqPtrOut.offset + vbase); in.emplace(header->msgqPtrOut.offset + vbase, header->msgqOut.offset + vbase);
outMemory.emplace(vbase + header->shmIn.offset, header->shmIn.size, vbase, PHY_ADDR_BASE); outMemory.emplace(vbase + header->shmIn.offset, header->shmIn.size, vbase, PHY_ADDR_BASE);
// Flush any messages that are stuck in the pipe
Msg msg;
std::vector<Msg> toFree;
int i = 0;
while(!in->isEmpty() && i++ < 10000) {
if(!in->read(&msg))
break;
switch(msg.command) {
case Msg::Command::ComData: {
if(toFree.empty() || toFree.back().payload.free.refCount == 6) {
toFree.emplace_back();
toFree.back().command = Msg::Command::ComFree;
toFree.back().payload.free.refCount = 0;
}
// Add this ref to the list of payloads to free
// After we process these, we'll send this list back to the device
// so that it can free these entries
toFree.back().payload.free.ref[toFree.back().payload.free.refCount] = msg.payload.data.ref;
toFree.back().payload.free.refCount++;
break;
}
}
}
//std::cout << "Flushed " << std::dec << i << " freeing " << toFree.size() << std::endl;
while(!toFree.empty()) {
std::lock_guard<std::mutex> lk(outMutex);
out->write(&toFree.back());
toFree.pop_back();
}
// Create thread // Create thread
// No thread for writing since we don't need the extra buffer // No thread for writing since we don't need the extra buffer
readThread = std::thread(&FirmIO::readTask, this); readThread = std::thread(&FirmIO::readTask, this);
@ -134,16 +169,17 @@ bool FirmIO::close() {
closing = false; closing = false;
disconnected = false; disconnected = false;
int ret = munmap(vbase, MMAP_LEN); int ret = 0;
vbase = nullptr; if(vbase != nullptr) {
ret |= munmap(vbase, MMAP_LEN);
vbase = nullptr;
}
ret |= ::close(fd); ret |= ::close(fd);
fd = -1; fd = -1;
uint8_t flush; uint8_t flush;
WriteOperation flushop;
while (readQueue.try_dequeue(flush)) {} while (readQueue.try_dequeue(flush)) {}
while (writeQueue.try_dequeue(flushop)) {}
if(ret == 0) { if(ret == 0) {
return true; return true;
@ -155,12 +191,16 @@ bool FirmIO::close() {
void FirmIO::readTask() { void FirmIO::readTask() {
EventManager::GetInstance().downgradeErrorsOnCurrentThread(); EventManager::GetInstance().downgradeErrorsOnCurrentThread();
Msg msg;
std::vector<Msg> toFree;
while(!closing && !isDisconnected()) { while(!closing && !isDisconnected()) {
fd_set rfds = {0}; fd_set rfds = {0};
struct timeval tv = {0}; struct timeval tv = {0};
FD_SET(fd, &rfds); FD_SET(fd, &rfds);
tv.tv_usec = 50000; // 50ms tv.tv_usec = 50000; // 50ms
int ret = ::select(fd + 1, &rfds, NULL, NULL, &tv); int ret = ::select(fd + 1, &rfds, NULL, NULL, &tv);
// std::cout << "select returned " << ret << ' ' << errno << std::endl;
if(ret < 0) if(ret < 0)
report(APIEvent::Type::FailedToRead, APIEvent::Severity::Error); report(APIEvent::Type::FailedToRead, APIEvent::Severity::Error);
if(ret <= 0) if(ret <= 0)
@ -170,19 +210,18 @@ void FirmIO::readTask() {
ret = ::read(fd, &interruptCount, sizeof(interruptCount)); ret = ::read(fd, &interruptCount, sizeof(interruptCount));
if(ret < 0) if(ret < 0)
report(APIEvent::Type::FailedToRead, APIEvent::Severity::Error); report(APIEvent::Type::FailedToRead, APIEvent::Severity::Error);
if(ret < sizeof(interruptCount) || interruptCount < 1) if(ret < int(sizeof(interruptCount)) || interruptCount < 1)
continue; continue;
std::vector<Msg> toFree; toFree.clear();
Msg msg;
int i = 0; int i = 0;
while(!in->isEmpty() && i++ < 100) { while(!in->isEmpty() && i++ < 1000) {
if(!in->read(&msg)) if(!in->read(&msg))
break; break;
switch(msg.command) { switch(msg.command) {
case Msg::Command::ComData: { case Msg::Command::ComData: {
if(toFree.empty() || toFree.back().payload.free.refCount == 7) { if(toFree.empty() || toFree.back().payload.free.refCount == 6) {
toFree.emplace_back(); toFree.emplace_back();
toFree.back().command = Msg::Command::ComFree; toFree.back().command = Msg::Command::ComFree;
toFree.back().payload.free.refCount = 0; toFree.back().payload.free.refCount = 0;
@ -194,6 +233,8 @@ void FirmIO::readTask() {
toFree.back().payload.free.ref[toFree.back().payload.free.refCount] = msg.payload.data.ref; toFree.back().payload.free.ref[toFree.back().payload.free.refCount] = msg.payload.data.ref;
toFree.back().payload.free.refCount++; toFree.back().payload.free.refCount++;
// std::cout << "Got some data @ 0x" << std::hex << msg.payload.data.addr << " " << std::dec << msg.payload.data.len << std::endl;
// Translate the physical address back to our virtual address space // Translate the physical address back to our virtual address space
uint8_t* addr = reinterpret_cast<uint8_t*>(msg.payload.data.addr - PHY_ADDR_BASE + vbase); uint8_t* addr = reinterpret_cast<uint8_t*>(msg.payload.data.addr - PHY_ADDR_BASE + vbase);
readQueue.enqueue_bulk(addr, msg.payload.data.len); readQueue.enqueue_bulk(addr, msg.payload.data.len);
@ -201,6 +242,7 @@ void FirmIO::readTask() {
} }
case Msg::Command::ComFree: { case Msg::Command::ComFree: {
std::lock_guard<std::mutex> lk(outMutex); std::lock_guard<std::mutex> lk(outMutex);
// std::cout << "Got some free " << std::hex << msg.payload.free.ref[0] << std::endl;
for(uint32_t i = 0; i < msg.payload.free.refCount; i++) for(uint32_t i = 0; i < msg.payload.free.refCount; i++)
outMemory->free(reinterpret_cast<uint8_t*>(msg.payload.free.ref[i])); outMemory->free(reinterpret_cast<uint8_t*>(msg.payload.free.ref[i]));
break; break;
@ -238,6 +280,7 @@ bool FirmIO::writeInternal(const std::vector<uint8_t>& bytes) {
if(sharedData == nullptr) if(sharedData == nullptr)
return false; return false;
// std::cout << "coping " << bytes.size() << " bytes of data" << std::endl;
memcpy(sharedData, bytes.data(), bytes.size()); memcpy(sharedData, bytes.data(), bytes.size());
Msg msg = { Msg::Command::ComData }; Msg msg = { Msg::Command::ComData };
@ -252,38 +295,38 @@ bool FirmIO::writeInternal(const std::vector<uint8_t>& bytes) {
return ::write(fd, &genInterrupt, sizeof(genInterrupt)) == sizeof(genInterrupt); return ::write(fd, &genInterrupt, sizeof(genInterrupt)) == sizeof(genInterrupt);
} }
bool FirmIO::MsgQueue::read(Msg* msg) volatile { bool FirmIO::MsgQueue::read(Msg* msg) {
if(isEmpty()) // Contains memory_barrier() if(isEmpty()) // Contains memory_barrier()
return false; return false;
memcpy(msg, &msgs[tail], sizeof(*msg)); memcpy(msg, &msgs[info->tail], sizeof(*msg));
tail = (tail + 1) & (size - 1); info->tail = (info->tail + 1) & (info->size - 1);
memory_barrier(); memory_barrier();
return true; return true;
} }
bool FirmIO::MsgQueue::write(const Msg* msg) volatile { bool FirmIO::MsgQueue::write(const Msg* msg) {
if(isFull()) // Contains memory_barrier() if(isFull()) // Contains memory_barrier()
return false; return false;
memcpy(&msgs[head], msg, sizeof(*msg)); memcpy(&msgs[info->head], msg, sizeof(*msg));
head = (head + 1) & (size - 1); info->head = (info->head + 1) & (info->size - 1);
memory_barrier(); memory_barrier();
return true; return true;
} }
bool FirmIO::MsgQueue::isEmpty() const volatile { bool FirmIO::MsgQueue::isEmpty() const {
memory_barrier(); memory_barrier();
return head == tail; return info->head == info->tail;
} }
bool FirmIO::MsgQueue::isFull() const volatile { bool FirmIO::MsgQueue::isFull() const {
memory_barrier(); memory_barrier();
return ((head + 1) & (size - 1)) == tail; return ((info->head + 1) & (info->size - 1)) == info->tail;
} }
FirmIO::Mempool::Mempool(uint8_t* start, uint32_t size, void* virt, uint32_t phys) FirmIO::Mempool::Mempool(uint8_t* start, uint32_t size, uint8_t* virt, uint32_t phys)
: startAddress(start), totalSize(size), blocks(size / BlockSize), usedBlocks(0), : blocks(size / BlockSize), usedBlocks(0),
virtualAddress(virt), physicalAddress(phys) { virtualAddress(virt), physicalAddress(phys) {
size_t idx = 0; size_t idx = 0;
for(BlockInfo& block : blocks) { for(BlockInfo& block : blocks) {
@ -326,3 +369,7 @@ bool FirmIO::Mempool::free(uint8_t* addr) {
found->status = BlockInfo::Status::Free; found->status = BlockInfo::Status::Free;
return true; return true;
} }
uint32_t FirmIO::Mempool::translate(uint8_t* addr) const {
return reinterpret_cast<uint32_t>(addr - virtualAddress + physicalAddress);
}