Windows: PCAP: Fix a race which could cause transmit delays

If you had a chain of packets being sent all at once, the latter
section of packets could be delayed, theoretically infinitely.

If queue1 was filled and enqueued for transmit, then queue2
had packets enqueued in it while queue1 was still transmitting,
we'd try to fill queue2 further rather than waiting for queue1's
transmit to finish.

However, in that case, we wouldn't check if we could transmit
queue2 again until the next packet. If the user application
was waiting for the response from something in queue2
before pushing more packets, it could hang indefinitely.

This also fixes a subtle bug where hitting the "not safe to try
to fit any more packets in this queue" limit would cause a
packet to drop, as it would be dequeued and then tossed.

Closes GH-42
v0.3.0-dev
Paul Hollinsky 2022-02-04 01:14:08 -05:00
parent 2d1bb381f6
commit 6d92b7a03a
1 changed files with 22 additions and 14 deletions

View File

@ -285,26 +285,34 @@ void PCAP::writeTask() {
pcap_send_queue* queue1 = pcap.sendqueue_alloc(128000);
pcap_send_queue* queue2 = pcap.sendqueue_alloc(128000);
pcap_send_queue* queue = queue1;
std::vector<uint8_t> extraData;
while(!closing) {
if(!writeQueue.wait_dequeue_timed(writeOp, std::chrono::milliseconds(100)))
continue;
// Potentially, we added frames to a second queue faster than the other thread was able to hand the first
// off to the kernel. In that case, wait for a minimal amount of time before checking whether we can
// transmit it again.
if(writeQueue.wait_dequeue_timed(writeOp, std::chrono::milliseconds(queue->len ? 1 : 100))) {
unsigned int i = 0;
do {
ethPacketizer.inputDown(std::move(writeOp.bytes));
if(i++ >= (queue->maxlen - queue->len) / 1518 / 3)
break; // Not safe to try to fit any more packets in this queue, let it transmit and come around again
} while(writeQueue.try_dequeue(writeOp));
unsigned int i = 0;
do {
ethPacketizer.inputDown(std::move(writeOp.bytes));
} while(writeQueue.try_dequeue(writeOp) && i++ < (queue->maxlen - queue->len) / 1518 / 3);
for(const auto& data : ethPacketizer.outputDown()) {
pcap_pkthdr header = {};
header.caplen = header.len = bpf_u_int32(data.size());
if(pcap.sendqueue_queue(queue, &header, data.data()) == -1)
report(APIEvent::Type::FailedToWrite, APIEvent::Severity::EventWarning);
for(const auto& data : ethPacketizer.outputDown()) {
pcap_pkthdr header = {};
header.caplen = header.len = bpf_u_int32(data.size());
if(pcap.sendqueue_queue(queue, &header, data.data()) != 0)
report(APIEvent::Type::FailedToWrite, APIEvent::Severity::EventWarning);
}
}
std::unique_lock<std::mutex> lk(transmitQueueMutex);
if(!transmitQueue || queue->len + (1518*2) >= queue->maxlen) { // Checking if we want to swap sendqueues with the transmitTask
// Check if we want to transmit our current queue
// If we're not currently transmitting a queue, let this one transmit immediately for good latency
// If our queue is full and we're transmitting the other, we can't accept any more packets out of the writeQueue
// In that case we're putting as many packets into the driver as possible, so wait for it to be free
// This puts the backpressure on the writeQueue
if(queue->len && (!transmitQueue || queue->len + (1518*2) >= queue->maxlen)) {
if(transmitQueue) // Need to wait for the queue to become available
transmitQueueCV.wait(lk, [this] { return !transmitQueue; });