Added event callback functionality. EventManager now uses multiple mutexes to lock events, errors, callbacks, and downgradedThreads separately. Wrote single-threaded test for event callbacks.
parent
f9712a4bcd
commit
4f735a651c
|
|
@ -18,6 +18,7 @@ void EventManager::ResetInstance() {
|
||||||
// If this thread is not in the map, add it to be ignored
|
// If this thread is not in the map, add it to be ignored
|
||||||
// If it is, set it to be ignored
|
// If it is, set it to be ignored
|
||||||
void EventManager::downgradeErrorsOnCurrentThread() {
|
void EventManager::downgradeErrorsOnCurrentThread() {
|
||||||
|
std::lock_guard<std::mutex> lk(downgradedThreadsMutex);
|
||||||
auto i = downgradedThreads.find(std::this_thread::get_id());
|
auto i = downgradedThreads.find(std::this_thread::get_id());
|
||||||
if(i != downgradedThreads.end()) {
|
if(i != downgradedThreads.end()) {
|
||||||
i->second = true;
|
i->second = true;
|
||||||
|
|
@ -28,14 +29,33 @@ void EventManager::downgradeErrorsOnCurrentThread() {
|
||||||
|
|
||||||
// If this thread exists in the map, turn off downgrading
|
// If this thread exists in the map, turn off downgrading
|
||||||
void EventManager::cancelErrorDowngradingOnCurrentThread() {
|
void EventManager::cancelErrorDowngradingOnCurrentThread() {
|
||||||
|
std::lock_guard<std::mutex> lk(downgradedThreadsMutex);
|
||||||
auto i = downgradedThreads.find(std::this_thread::get_id());
|
auto i = downgradedThreads.find(std::this_thread::get_id());
|
||||||
if(i != downgradedThreads.end()) {
|
if(i != downgradedThreads.end()) {
|
||||||
i->second = false;
|
i->second = false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int EventManager::addEventCallback(const EventCallback &cb) {
|
||||||
|
std::lock_guard<std::mutex> lk(callbacksMutex);
|
||||||
|
callbacks.insert({callbackID, cb});
|
||||||
|
return callbackID++;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool EventManager::removeEventCallback(int id) {
|
||||||
|
std::lock_guard<std::mutex> lk(callbacksMutex);
|
||||||
|
|
||||||
|
auto iter = callbacks.find(id);
|
||||||
|
|
||||||
|
if(iter != callbacks.end()) {
|
||||||
|
callbacks.erase(iter);
|
||||||
|
return true;
|
||||||
|
} else
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
void EventManager::get(std::vector<APIEvent>& eventOutput, size_t max, EventFilter filter) {
|
void EventManager::get(std::vector<APIEvent>& eventOutput, size_t max, EventFilter filter) {
|
||||||
std::lock_guard<std::mutex> lk(mutex);
|
std::lock_guard<std::mutex> lk(eventsMutex);
|
||||||
|
|
||||||
if(max == 0) // A limit of 0 indicates no limit
|
if(max == 0) // A limit of 0 indicates no limit
|
||||||
max = (size_t)-1;
|
max = (size_t)-1;
|
||||||
|
|
@ -60,7 +80,7 @@ void EventManager::get(std::vector<APIEvent>& eventOutput, size_t max, EventFilt
|
||||||
* If no error was found, return a NoErrorFound Info event
|
* If no error was found, return a NoErrorFound Info event
|
||||||
*/
|
*/
|
||||||
APIEvent EventManager::getLastError() {
|
APIEvent EventManager::getLastError() {
|
||||||
std::lock_guard<std::mutex> lk(mutex);
|
std::lock_guard<std::mutex> lk(errorsMutex);
|
||||||
|
|
||||||
auto it = lastUserErrors.find(std::this_thread::get_id());
|
auto it = lastUserErrors.find(std::this_thread::get_id());
|
||||||
if(it == lastUserErrors.end()) {
|
if(it == lastUserErrors.end()) {
|
||||||
|
|
@ -73,7 +93,7 @@ APIEvent EventManager::getLastError() {
|
||||||
}
|
}
|
||||||
|
|
||||||
void EventManager::discard(EventFilter filter) {
|
void EventManager::discard(EventFilter filter) {
|
||||||
std::lock_guard<std::mutex> lk(mutex);
|
std::lock_guard<std::mutex> lk(eventsMutex);
|
||||||
events.remove_if([&filter](const APIEvent& event) {
|
events.remove_if([&filter](const APIEvent& event) {
|
||||||
return filter.match(event);
|
return filter.match(event);
|
||||||
});
|
});
|
||||||
|
|
|
||||||
|
|
@ -79,6 +79,7 @@ It is recommended to either enable message polling or manually register callback
|
||||||
|
|
||||||
Write Blocking Status
|
Write Blocking Status
|
||||||
~~~~~~~~~~~~~~~~~~~~~~~
|
~~~~~~~~~~~~~~~~~~~~~~~
|
||||||
|
|
||||||
The write blocking status of the device determines the behavior of attempting to transmit to the device (likely via sending messages) with a large backlog of messages.
|
The write blocking status of the device determines the behavior of attempting to transmit to the device (likely via sending messages) with a large backlog of messages.
|
||||||
If write blocking is enabled, then the transmitting thread will wait for the entire buffer to be transmitted.
|
If write blocking is enabled, then the transmitting thread will wait for the entire buffer to be transmitted.
|
||||||
If write blocking is disabled, then the attempt to transmit will simply fail and an error will be logged on the calling thread.
|
If write blocking is disabled, then the attempt to transmit will simply fail and an error will be logged on the calling thread.
|
||||||
|
|
@ -0,0 +1,39 @@
|
||||||
|
#ifndef __EVENTCALLBACK_H_
|
||||||
|
#define __EVENTCALLBACK_H_
|
||||||
|
|
||||||
|
#include <functional>
|
||||||
|
#include <memory>
|
||||||
|
#include "event.h"
|
||||||
|
|
||||||
|
namespace icsneo {
|
||||||
|
|
||||||
|
class EventCallback {
|
||||||
|
public:
|
||||||
|
typedef std::function< void( std::shared_ptr<APIEvent> ) > fn_eventCallback;
|
||||||
|
|
||||||
|
EventCallback(fn_eventCallback cb, std::shared_ptr<EventFilter> f) : callback(cb), filter(f) {}
|
||||||
|
EventCallback(fn_eventCallback cb, EventFilter f = EventFilter()) : callback(cb), filter(std::make_shared<EventFilter>(f)) {}
|
||||||
|
|
||||||
|
// Allow the filter to be placed first if the user wants (maybe in the case of a lambda)
|
||||||
|
EventCallback(std::shared_ptr<EventFilter> f, fn_eventCallback cb) : callback(cb), filter(f) {}
|
||||||
|
EventCallback(EventFilter f, fn_eventCallback cb) : callback(cb), filter(std::make_shared<EventFilter>(f)) {}
|
||||||
|
|
||||||
|
virtual bool callIfMatch(const std::shared_ptr<APIEvent>& event) const {
|
||||||
|
bool ret = filter->match(*event);
|
||||||
|
if(ret)
|
||||||
|
callback(event);
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
const EventFilter& getFilter() const { return *filter; }
|
||||||
|
const fn_eventCallback& getCallback() const { return callback; }
|
||||||
|
|
||||||
|
private:
|
||||||
|
fn_eventCallback callback;
|
||||||
|
std::shared_ptr<EventFilter> filter;
|
||||||
|
|
||||||
|
};
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
#endif
|
||||||
|
|
@ -9,6 +9,7 @@
|
||||||
#include <thread>
|
#include <thread>
|
||||||
#include <algorithm>
|
#include <algorithm>
|
||||||
#include "icsneo/api/event.h"
|
#include "icsneo/api/event.h"
|
||||||
|
#include "icsneo/api/eventcallback.h"
|
||||||
|
|
||||||
namespace icsneo {
|
namespace icsneo {
|
||||||
|
|
||||||
|
|
@ -24,8 +25,12 @@ public:
|
||||||
|
|
||||||
void cancelErrorDowngradingOnCurrentThread();
|
void cancelErrorDowngradingOnCurrentThread();
|
||||||
|
|
||||||
|
int addEventCallback(const EventCallback &cb);
|
||||||
|
|
||||||
|
bool removeEventCallback(int id);
|
||||||
|
|
||||||
size_t eventCount(EventFilter filter = EventFilter()) const {
|
size_t eventCount(EventFilter filter = EventFilter()) const {
|
||||||
std::lock_guard<std::mutex> lk(mutex);
|
std::lock_guard<std::mutex> lk(eventsMutex);
|
||||||
return count_internal(filter);
|
return count_internal(filter);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
@ -41,17 +46,34 @@ public:
|
||||||
APIEvent getLastError();
|
APIEvent getLastError();
|
||||||
|
|
||||||
void add(APIEvent event) {
|
void add(APIEvent event) {
|
||||||
std::lock_guard<std::mutex> lk(mutex);
|
if(event.getSeverity() == APIEvent::Severity::Error) {
|
||||||
add_internal(event);
|
// if the error was added on a thread that downgrades errors (non-user thread)
|
||||||
|
std::lock_guard<std::mutex> lk(downgradedThreadsMutex);
|
||||||
|
auto i = downgradedThreads.find(std::this_thread::get_id());
|
||||||
|
if(i != downgradedThreads.end() && i->second) {
|
||||||
|
event.downgradeFromError();
|
||||||
|
runCallbacks(event);
|
||||||
|
std::lock_guard<std::mutex> lk(eventsMutex);
|
||||||
|
add_internal_event(event);
|
||||||
|
} else {
|
||||||
|
std::lock_guard<std::mutex> lk(errorsMutex);
|
||||||
|
add_internal_error(event);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
runCallbacks(event);
|
||||||
|
std::lock_guard<std::mutex> lk(eventsMutex);
|
||||||
|
add_internal_event(event);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
void add(APIEvent::Type type, APIEvent::Severity severity, const Device* forDevice = nullptr) {
|
void add(APIEvent::Type type, APIEvent::Severity severity, const Device* forDevice = nullptr) {
|
||||||
std::lock_guard<std::mutex> lk(mutex);
|
add(APIEvent(type, severity, forDevice));
|
||||||
add_internal(APIEvent(type, severity, forDevice));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void discard(EventFilter filter = EventFilter());
|
void discard(EventFilter filter = EventFilter());
|
||||||
|
|
||||||
void setEventLimit(size_t newLimit) {
|
void setEventLimit(size_t newLimit) {
|
||||||
|
std::lock_guard<std::mutex> lk(eventsMutex);
|
||||||
|
|
||||||
if(newLimit == eventLimit)
|
if(newLimit == eventLimit)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
|
|
@ -59,27 +81,33 @@ public:
|
||||||
add(APIEvent::Type::ParameterOutOfRange, APIEvent::Severity::Error);
|
add(APIEvent::Type::ParameterOutOfRange, APIEvent::Severity::Error);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
std::lock_guard<std::mutex> lk(mutex);
|
|
||||||
eventLimit = newLimit;
|
eventLimit = newLimit;
|
||||||
if(enforceLimit())
|
if(enforceLimit())
|
||||||
add_internal(APIEvent(APIEvent::Type::TooManyEvents, APIEvent::Severity::EventWarning));
|
add_internal_event(APIEvent(APIEvent::Type::TooManyEvents, APIEvent::Severity::EventWarning));
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t getEventLimit() const {
|
size_t getEventLimit() const {
|
||||||
std::lock_guard<std::mutex> lk(mutex);
|
std::lock_guard<std::mutex> lk(eventsMutex);
|
||||||
return eventLimit;
|
return eventLimit;
|
||||||
}
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
EventManager() : mutex(), downgradedThreads(), events(), lastUserErrors(), eventLimit(10000) {}
|
EventManager() : eventsMutex(), errorsMutex(), downgradedThreadsMutex(), callbacksMutex(), downgradedThreads(), callbacks(), events(), lastUserErrors(), eventLimit(10000) {}
|
||||||
EventManager(const EventManager &other);
|
EventManager(const EventManager &other);
|
||||||
EventManager& operator=(const EventManager &other);
|
EventManager& operator=(const EventManager &other);
|
||||||
|
|
||||||
// Used by functions for threadsafety
|
// Used by functions for threadsafety
|
||||||
mutable std::mutex mutex;
|
mutable std::mutex eventsMutex;
|
||||||
|
mutable std::mutex errorsMutex;
|
||||||
|
mutable std::mutex downgradedThreadsMutex;
|
||||||
|
mutable std::mutex callbacksMutex;
|
||||||
|
|
||||||
std::map<std::thread::id, bool> downgradedThreads;
|
std::map<std::thread::id, bool> downgradedThreads;
|
||||||
|
|
||||||
|
std::map<int, EventCallback> callbacks;
|
||||||
|
|
||||||
|
int callbackID = 0;
|
||||||
|
|
||||||
// Stores all events
|
// Stores all events
|
||||||
std::list<APIEvent> events;
|
std::list<APIEvent> events;
|
||||||
|
|
@ -88,17 +116,11 @@ private:
|
||||||
|
|
||||||
size_t count_internal(EventFilter filter = EventFilter()) const;
|
size_t count_internal(EventFilter filter = EventFilter()) const;
|
||||||
|
|
||||||
void add_internal(APIEvent event) {
|
void runCallbacks(APIEvent event) {
|
||||||
if(event.getSeverity() == APIEvent::Severity::Error) {
|
std::lock_guard<std::mutex> lk(callbacksMutex);
|
||||||
// if the error was added on a thread that downgrades errors (non-user thread)
|
for(auto &i : callbacks) {
|
||||||
auto i = downgradedThreads.find(std::this_thread::get_id());
|
i.second.callIfMatch(std::make_shared<APIEvent>(event));
|
||||||
if(i != downgradedThreads.end() && i->second) {
|
}
|
||||||
event.downgradeFromError();
|
|
||||||
add_internal_event(event);
|
|
||||||
} else
|
|
||||||
add_internal_error(event);
|
|
||||||
} else
|
|
||||||
add_internal_event(event);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -1,4 +1,5 @@
|
||||||
#include <thread>
|
#include <thread>
|
||||||
|
#include <memory>
|
||||||
|
|
||||||
#include "icsneo/icsneocpp.h"
|
#include "icsneo/icsneocpp.h"
|
||||||
#include "gtest/gtest.h"
|
#include "gtest/gtest.h"
|
||||||
|
|
@ -13,6 +14,75 @@ protected:
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
TEST_F(EventManagerTest, SingleThreadCallbacksTest) {
|
||||||
|
int callCounter = 0;
|
||||||
|
|
||||||
|
// increments counter when baudrate events show up
|
||||||
|
int id1 = EventManager::GetInstance().addEventCallback(EventCallback([&callCounter](std::shared_ptr<APIEvent>){
|
||||||
|
callCounter++;
|
||||||
|
}, EventFilter(APIEvent::Type::BaudrateNotFound)));
|
||||||
|
|
||||||
|
// increments counter when infos show up
|
||||||
|
int id2 = EventManager::GetInstance().addEventCallback(EventCallback([&callCounter](std::shared_ptr<APIEvent>) {
|
||||||
|
callCounter++;
|
||||||
|
}, EventFilter(APIEvent::Severity::EventInfo)));
|
||||||
|
|
||||||
|
EXPECT_EQ(callCounter, 0);
|
||||||
|
|
||||||
|
EventManager::GetInstance().add(APIEvent(APIEvent::Type::DeviceCurrentlyClosed, APIEvent::Severity::EventWarning));
|
||||||
|
|
||||||
|
EXPECT_EQ(callCounter, 0);
|
||||||
|
|
||||||
|
EventManager::GetInstance().add(APIEvent(APIEvent::Type::BaudrateNotFound, APIEvent::Severity::EventWarning));
|
||||||
|
|
||||||
|
EXPECT_EQ(callCounter, 1);
|
||||||
|
|
||||||
|
EventManager::GetInstance().add(APIEvent(APIEvent::Type::DeviceCurrentlyClosed, APIEvent::Severity::EventInfo));
|
||||||
|
|
||||||
|
EXPECT_EQ(callCounter, 2);
|
||||||
|
|
||||||
|
EventManager::GetInstance().add(APIEvent(APIEvent::Type::BaudrateNotFound, APIEvent::Severity::EventInfo));
|
||||||
|
|
||||||
|
EXPECT_EQ(callCounter, 4);
|
||||||
|
|
||||||
|
EXPECT_EQ(EventManager::GetInstance().removeEventCallback(id2), true);
|
||||||
|
|
||||||
|
EventManager::GetInstance().add(APIEvent(APIEvent::Type::DeviceCurrentlyClosed, APIEvent::Severity::EventInfo));
|
||||||
|
|
||||||
|
EXPECT_EQ(callCounter, 4);
|
||||||
|
|
||||||
|
EventManager::GetInstance().add(APIEvent(APIEvent::Type::BaudrateNotFound, APIEvent::Severity::EventInfo));
|
||||||
|
|
||||||
|
EXPECT_EQ(callCounter, 5);
|
||||||
|
|
||||||
|
// increments counter when device currently open shows up
|
||||||
|
int id3 = EventManager::GetInstance().addEventCallback(EventCallback([&callCounter](std::shared_ptr<APIEvent>) {
|
||||||
|
callCounter++;
|
||||||
|
}, EventFilter(APIEvent::Type::DeviceCurrentlyOpen)));
|
||||||
|
|
||||||
|
EventManager::GetInstance().add(APIEvent(APIEvent::Type::DeviceCurrentlyOpen, APIEvent::Severity::EventInfo));
|
||||||
|
|
||||||
|
EXPECT_EQ(callCounter, 6);
|
||||||
|
|
||||||
|
EXPECT_EQ(EventManager::GetInstance().removeEventCallback(id2), false);
|
||||||
|
EXPECT_EQ(EventManager::GetInstance().removeEventCallback(id1), true);
|
||||||
|
|
||||||
|
EventManager::GetInstance().add(APIEvent(APIEvent::Type::BaudrateNotFound, APIEvent::Severity::EventInfo));
|
||||||
|
|
||||||
|
EXPECT_EQ(callCounter, 6);
|
||||||
|
|
||||||
|
EventManager::GetInstance().add(APIEvent(APIEvent::Type::DeviceCurrentlyOpen, APIEvent::Severity::EventInfo));
|
||||||
|
|
||||||
|
EXPECT_EQ(callCounter, 7);
|
||||||
|
|
||||||
|
EXPECT_EQ(EventManager::GetInstance().removeEventCallback(id3), true);
|
||||||
|
|
||||||
|
EventManager::GetInstance().add(APIEvent(APIEvent::Type::DeviceCurrentlyOpen, APIEvent::Severity::EventInfo));
|
||||||
|
|
||||||
|
EXPECT_EQ(callCounter, 7);
|
||||||
|
}
|
||||||
|
|
||||||
TEST_F(EventManagerTest, ErrorDowngradingTest) {
|
TEST_F(EventManagerTest, ErrorDowngradingTest) {
|
||||||
// Check that main thread has no errors
|
// Check that main thread has no errors
|
||||||
EXPECT_EQ(GetLastError().getType(), APIEvent::Type::NoErrorFound);
|
EXPECT_EQ(GetLastError().getType(), APIEvent::Type::NoErrorFound);
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue