// ©2014 Cameron Desrochers // moodycamel::ConcurrentQueue contains many inner data structures which // are difficult to test in isolation. So, this file contains copies of // them, extracted and isolated so as to be independently testable. #pragma once #include #include #include #include #include #include #include // Define corealgos_allocator before including this header in order to override the // default malloc/free functions #ifndef corealgos_allocator struct corealgos_allocator { static inline void* malloc(std::size_t size) { return std::malloc(size); } static inline void free(void* ptr) { std::free(ptr); } }; #endif //////////////////////////////////////////////////////////////////////////////// // Lock-free add-only list (e.g. used to track producers) //////////////////////////////////////////////////////////////////////////////// namespace moodycamel { namespace corealgos { struct ListItem { ListItem() : concurrentListPrev(nullptr) { } public: std::atomic concurrentListPrev; }; template // T should inherit ListItem or implement the same interface struct ConcurrentAddOnlyList { ConcurrentAddOnlyList() : tail_(nullptr) { } inline T* tail() { return tail_.load(std::memory_order_acquire); } void add(T* element) { assert(element != nullptr); // Add it to the lock-free list auto prevTail = tail_.load(std::memory_order_relaxed); do { element->concurrentListPrev = prevTail; } while (!tail_.compare_exchange_weak(prevTail, element, std::memory_order_release, std::memory_order_relaxed)); } private: std::atomic tail_; }; } } //////////////////////////////////////////////////////////////////////////////// // Thread local hash map //////////////////////////////////////////////////////////////////////////////// #if defined(__APPLE__) #include "TargetConditionals.h" // Needed for TARGET_OS_IPHONE #endif // Platform-specific definitions of a numeric thread ID type and an invalid value #if defined(_WIN32) || defined(__WINDOWS__) || defined(__WIN32__) // No sense pulling in windows.h in a header, we'll manually declare the function // we use and rely on backwards-compatibility for this not to break extern "C" __declspec(dllimport) unsigned long __stdcall GetCurrentThreadId(void); namespace moodycamel { namespace corealgos { namespace details { static_assert(sizeof(unsigned long) == sizeof(std::uint32_t), "Expected size of unsigned long to be 32 bits on Windows"); typedef std::uint32_t thread_id_t; static const thread_id_t invalid_thread_id = 0; // See http://blogs.msdn.com/b/oldnewthing/archive/2004/02/23/78395.aspx static inline thread_id_t thread_id() { return static_cast(::GetCurrentThreadId()); } } } } #elif defined(__arm__) || defined(_M_ARM) || defined(__aarch64__) || (defined(__APPLE__) && TARGET_OS_IPHONE) namespace moodycamel { namespace corealgos { namespace details { typedef std::uintptr_t thread_id_t; static const thread_id_t invalid_thread_id = 0; static inline thread_id_t thread_id() { return std::hash()(std::this_thread::get_id()); } } } } #else // Use a nice trick from this answer: http://stackoverflow.com/a/8438730/21475 // In order to get a numeric thread ID in a platform-independent way, we use a thread-local // static variable's address as a thread identifier :-) #if defined(__GNUC__) || defined(__INTEL_COMPILER) #define MOODYCAMEL_COREALGO_THREADLOCAL __thread #elif defined(_MSC_VER) #define MOODYCAMEL_COREALGO_THREADLOCAL __declspec(thread) #else // Assume C++11 compliant compiler #define MOODYCAMEL_COREALGO_THREADLOCAL thread_local #endif namespace moodycamel { namespace corealgos { namespace details { typedef std::uintptr_t thread_id_t; static const thread_id_t invalid_thread_id = 0; // Address can't be nullptr static inline thread_id_t thread_id() { static MOODYCAMEL_COREALGO_THREADLOCAL int x; return reinterpret_cast(&x); } } } } #endif namespace moodycamel { namespace corealgos { namespace details { template struct _hash_32_or_64 { static inline std::size_t hash(std::uint32_t h) { // MurmurHash3 finalizer -- see https://code.google.com/p/smhasher/source/browse/trunk/MurmurHash3.cpp // Since the thread ID is already unique, all we really want to do is propagate that // uniqueness evenly across all the bits, so that we can use a subset of the bits while // reducing collisions significantly h ^= h >> 16; h *= 0x85ebca6b; h ^= h >> 13; h *= 0xc2b2ae35; return static_cast(h ^ (h >> 16)); } }; template<> struct _hash_32_or_64<1> { static inline std::size_t hash(std::uint64_t h) { h ^= h >> 33; h *= 0xff51afd7ed558ccd; h ^= h >> 33; h *= 0xc4ceb9fe1a85ec53; return static_cast(h ^ (h >> 33)); } }; template struct hash_32_or_64 : public _hash_32_or_64<(size > 4)> { }; static inline std::size_t hash_thread_id(thread_id_t id) { static_assert(sizeof(thread_id_t) <= 8, "Expected a platform where thread IDs are at most 64-bit values"); return hash_32_or_64::hash(id); } template static inline char* align_for(char* ptr) { const std::size_t alignment = std::alignment_of::value; return ptr + (alignment - (reinterpret_cast(ptr) % alignment)) % alignment; } } template // T should inherit ListItem or implement the same interface struct ThreadLocal { explicit ThreadLocal(std::size_t initialHashSize) : initialHashEntries(initialHashSize) { assert(initialHashSize > 0 && (initialHashSize & (initialHashSize - 1)) == 0); resizeInProgress.clear(); currentHashCount.store(0, std::memory_order_relaxed); auto hash = &initialHash; hash->capacity = initialHashSize; hash->entries = &initialHashEntries[0]; for (std::size_t i = 0; i != initialHashSize; ++i) { initialHashEntries[i].key.store(details::invalid_thread_id, std::memory_order_relaxed); } hash->prev = nullptr; currentHash.store(hash, std::memory_order_relaxed); } ~ThreadLocal() { // Destroy items auto ptr = items.tail(); while (ptr != nullptr) { auto prev = static_cast(ptr->concurrentListPrev.load(std::memory_order_relaxed)); ptr->~T(); corealgos_allocator::free(ptr); ptr = prev; } // Destroy hash tables auto hash = currentHash.load(std::memory_order_relaxed); while (hash != nullptr) { auto prev = hash->prev; if (prev != nullptr) { // The last hash is part of this object and was not allocated dynamically for (std::size_t i = 0; i != hash->capacity; ++i) { hash->entries[i].~KeyValuePair(); } hash->~InnerHash(); corealgos_allocator::free(hash); } hash = prev; } } // Only fails (returns nullptr) if memory allocation fails T* get_or_create() { // Note that since the data is essentially thread-local (key is thread ID), // there's a reduced need for fences (memory ordering is already consistent // for any individual thread), except for the current table itself // Start by looking for the thread ID in the current and all previous hash tables. // If it's not found, it must not be in there yet, since this same thread would // have added it previously to one of the tables that we traversed. // Code and algorithm adapted from http://preshing.com/20130605/the-worlds-simplest-lock-free-hash-table auto id = details::thread_id(); auto hashedId = details::hash_thread_id(id); auto mainHash = currentHash.load(std::memory_order_acquire); for (auto hash = mainHash; hash != nullptr; hash = hash->prev) { // Look for the id in this hash auto index = hashedId; while (true) { // Not an infinite loop because at least one slot is free in the hash table index &= hash->capacity - 1; auto probedKey = hash->entries[index].key.load(std::memory_order_relaxed); if (probedKey == id) { // Found it! If we had to search several hashes deep, though, we should lazily add it // to the current main hash table to avoid the extended search next time. // Note there's guaranteed to be room in the current hash table since every subsequent // table implicitly reserves space for all previous tables (there's only one // currentHashCount). auto value = hash->entries[index].value; if (hash != mainHash) { index = hashedId; while (true) { index &= mainHash->capacity - 1; probedKey = mainHash->entries[index].key.load(std::memory_order_relaxed); auto expected = details::invalid_thread_id; if (probedKey == expected && mainHash->entries[index].key.compare_exchange_strong(expected, id, std::memory_order_relaxed)) { mainHash->entries[index].value = value; break; } ++index; } } return value; } if (probedKey == details::invalid_thread_id) { break; // Not in this hash table } ++index; } } // Insert! auto newCount = 1 + currentHashCount.fetch_add(1, std::memory_order_relaxed); while (true) { if (newCount >= (mainHash->capacity >> 1) && !resizeInProgress.test_and_set(std::memory_order_acquire)) { // We've acquired the resize lock, try to allocate a bigger hash table. // Note the acquire fence synchronizes with the release fence at the end of this block, and hence when // we reload currentHash it must be the most recent version (it only gets changed within this // locked block). mainHash = currentHash.load(std::memory_order_acquire); auto newCapacity = mainHash->capacity << 1; while (newCount >= (newCapacity >> 1)) { newCapacity <<= 1; } auto raw = static_cast(corealgos_allocator::malloc(sizeof(InnerHash) + std::alignment_of::value - 1 + sizeof(KeyValuePair) * newCapacity)); if (raw == nullptr) { // Allocation failed currentHashCount.fetch_add((uint32_t)-1, std::memory_order_relaxed); resizeInProgress.clear(std::memory_order_relaxed); return nullptr; } auto newHash = new (raw) InnerHash; newHash->capacity = newCapacity; newHash->entries = reinterpret_cast(details::align_for(raw + sizeof(InnerHash))); for (std::size_t i = 0; i != newCapacity; ++i) { new (newHash->entries + i) KeyValuePair; newHash->entries[i].key.store(details::invalid_thread_id, std::memory_order_relaxed); } newHash->prev = mainHash; currentHash.store(newHash, std::memory_order_release); resizeInProgress.clear(std::memory_order_release); mainHash = newHash; } // If it's < three-quarters full, add to the old one anyway so that we don't have to wait for the next table // to finish being allocated by another thread (and if we just finished allocating above, the condition will // always be true) if (newCount < (mainHash->capacity >> 1) + (mainHash->capacity >> 2)) { auto element = (T*)corealgos_allocator::malloc(sizeof(T)); if (element == nullptr) { return nullptr; } new (element) T(); items.add(element); // Track items so they can be destructed later auto index = hashedId; while (true) { index &= mainHash->capacity - 1; auto probedKey = mainHash->entries[index].key.load(std::memory_order_relaxed); auto expected = details::invalid_thread_id; if (probedKey == expected && mainHash->entries[index].key.compare_exchange_strong(expected, id, std::memory_order_relaxed)) { mainHash->entries[index].value = element; break; } ++index; } return element; } // Hmm, the old hash is quite full and somebody else is busy allocating a new one. // We need to wait for the allocating thread to finish (if it succeeds, we add, if not, // we try to allocate ourselves). mainHash = currentHash.load(std::memory_order_acquire); } } private: struct KeyValuePair { std::atomic key; T* value; // No need for atomicity since it's only read by the thread that sets it in the first place KeyValuePair() { } KeyValuePair(KeyValuePair const& other) : key(other.key.load()), value(other.value) { } KeyValuePair& operator=(KeyValuePair const& other) { key.store(other.key.load()); value = other.value; return *this; } }; struct InnerHash { std::size_t capacity; KeyValuePair* entries; InnerHash* prev; }; std::atomic_flag resizeInProgress; std::atomic currentHash; std::atomic currentHashCount; // Number of slots logically used InnerHash initialHash; std::vector initialHashEntries; ConcurrentAddOnlyList items; }; //////////////////////////////////////////////////////////////////////////////// // Lock-free free list //////////////////////////////////////////////////////////////////////////////// template struct FreeListNode { FreeListNode() : freeListRefs(0), freeListNext(nullptr) { } std::atomic freeListRefs; std::atomic freeListNext; FreeListNode(FreeListNode const& other) : freeListRefs(other.freeListRefs.load()), freeListNext(other.freeListNext.load()) { } FreeListNode& operator=(FreeListNode const& other) { freeListRefs.store(other.freeListRefs.load()); freeListNext.store(other.freeListNext.load()); return *this; } }; // A simple CAS-based lock-free free list. Not the fastest thing in the world under heavy contention, // but simple and correct (assuming nodes are never freed until after the free list is destroyed), // and fairly speedy under low contention. template // N must inherit FreeListNode or have the same fields (and initialization) struct FreeList { FreeList() : freeListHead(nullptr) { } inline void add(N* node) { // We know that the should-be-on-freelist bit is 0 at this point, so it's safe to // set it using a fetch_add if (node->freeListRefs.fetch_add(SHOULD_BE_ON_FREELIST, std::memory_order_acq_rel) == 0) { // Oh look! We were the last ones referencing this node, and we know // we want to add it to the free list, so let's do it! add_knowing_refcount_is_zero(node); } } inline N* try_get() { auto head = freeListHead.load(std::memory_order_acquire); while (head != nullptr) { auto prevHead = head; auto refs = head->freeListRefs.load(std::memory_order_relaxed); if ((refs & REFS_MASK) == 0 || !head->freeListRefs.compare_exchange_strong(refs, refs + 1, std::memory_order_acquire, std::memory_order_relaxed)) { head = freeListHead.load(std::memory_order_acquire); continue; } // Good, reference count has been incremented (it wasn't at zero), which means // we can read the next and not worry about it changing between now and the time // we do the CAS auto next = head->freeListNext.load(std::memory_order_relaxed); if (freeListHead.compare_exchange_strong(head, next, std::memory_order_acquire, std::memory_order_relaxed)) { // Yay, got the node. This means it was on the list, which means // shouldBeOnFreeList must be false no matter the refcount (because // nobody else knows it's been taken off yet, it can't have been put back on). assert((head->freeListRefs.load(std::memory_order_relaxed) & SHOULD_BE_ON_FREELIST) == 0); // Decrease refcount twice, once for our ref, and once for the list's ref head->freeListRefs.fetch_add(-2u, std::memory_order_release); return head; } // OK, the head must have changed on us, but we still need to decrease the refcount we // increased. // Note that we don't need to release any memory effects, but we do need to ensure that the reference // count decrement happens-after the CAS on the head. refs = prevHead->freeListRefs.fetch_add(-1u, std::memory_order_acq_rel); if (refs == SHOULD_BE_ON_FREELIST + 1) { add_knowing_refcount_is_zero(prevHead); } } return nullptr; } // Useful for traversing the list when there's no contention (e.g. to destroy remaining nodes) N* head_unsafe() const { return freeListHead.load(std::memory_order_relaxed); } private: inline void add_knowing_refcount_is_zero(N* node) { // Since the refcount is zero, and nobody can increase it once it's zero (except us, and we // run only one copy of this method per node at a time, i.e. the single thread case), then we // know we can safely change the next pointer of the node; however, once the refcount is back // above zero, then other threads could increase it (happens under heavy contention, when the // refcount goes to zero in between a load and a refcount increment of a node in try_get, then // back up to something non-zero, then the refcount increment is done by the other thread) -- // so, if the CAS to add the node to the actual list fails, decrease the refcount and leave // the add operation to the next thread who puts the refcount back at zero (which could be us, // hence the loop). auto head = freeListHead.load(std::memory_order_relaxed); while (true) { node->freeListNext.store(head, std::memory_order_relaxed); node->freeListRefs.store(1, std::memory_order_release); if (!freeListHead.compare_exchange_strong(head, node, std::memory_order_release, std::memory_order_relaxed)) { // Hmm, the add failed, but we can only try again when the refcount goes back to zero if (node->freeListRefs.fetch_add(SHOULD_BE_ON_FREELIST - 1, std::memory_order_release) == 1) { continue; } } return; } } private: static const std::uint32_t REFS_MASK = 0x7FFFFFFF; static const std::uint32_t SHOULD_BE_ON_FREELIST = 0x80000000; // Implemented like a stack, but where node order doesn't matter (nodes are // inserted out of order under contention) std::atomic freeListHead; }; //////////////////////////////////////////////////////////////////////////////// // Lock-free (single-producer, multi-consumer) numeric-key hash map of sorts; // there are many conditions that must be met, i.e. items have to be inserted // in increasing order by key (wrap-around is OK), and items cannot be searched // for or removed unless they are known to be in the map in the first place. //////////////////////////////////////////////////////////////////////////////// template struct SPMCSequentialHashMap { explicit SPMCSequentialHashMap(std::size_t initialSize) : nextCapacity(initialSize), index(nullptr) { new_index(); } ~SPMCSequentialHashMap() { auto ptr = index.load(std::memory_order_relaxed); if (ptr != nullptr) { for (std::size_t i = 0; i != ptr->capacity; ++i) { ptr->index[i]->~IndexEntry(); } do { auto prev = ptr->prev; ptr->~IndexHeader(); corealgos_allocator::free(ptr); ptr = prev; } while (ptr != nullptr); } } // Not thread safe. Only call from single producer thread. // Note: key must *not* be in hash already, and must be exactly // one larger than the previously inserted key value. void insert(std::uint64_t key, TValue* value) { IndexEntry* idxEntry; insert_index_entry(idxEntry, key); idxEntry->value.store(value, std::memory_order_release); } // Thread-safe, but if somebody can remove the key while find() is // in progress, then any returned value is not guaranteed to correspond // to that key. This also applies if the key was not already present but // once was. Elements can be found in any order. TValue* find(std::uint64_t key) { auto idxEntry = get_entry_for_key(key); if (idxEntry == nullptr) return nullptr; return idxEntry->value.load(std::memory_order_acquire); } // Thread-safe, but if somebody else can remove the same key while remove() // is in progress, then any removed value is not guaranteed to correspond // to that key This also applies if the key was not already present but // once was. Elements can be removed in an order. TValue* remove(std::uint64_t key) { auto idxEntry = get_entry_for_key(key); if (idxEntry == nullptr) return nullptr; TValue* val = nullptr; while (!idxEntry->value.compare_exchange_weak(val, nullptr, std::memory_order_acquire, std::memory_order_relaxed)) continue; return val; } private: struct IndexEntry { std::atomic key; std::atomic value; }; struct IndexHeader { std::size_t capacity; std::atomic tail; IndexEntry* entries; IndexEntry** index; IndexHeader* prev; }; inline void insert_index_entry(IndexEntry*& idxEntry, std::uint64_t key) { auto localIndex = index.load(std::memory_order_relaxed); // We're the only writer thread, relaxed is OK auto newTail = (localIndex->tail.load(std::memory_order_relaxed) + 1) & (localIndex->capacity - 1); idxEntry = localIndex->index[newTail]; if (idxEntry->key.load(std::memory_order_relaxed) == INVALID_KEY || idxEntry->value.load(std::memory_order_relaxed) == nullptr) { idxEntry->key.store(key, std::memory_order_relaxed); localIndex->tail.store(newTail, std::memory_order_release); return; } // No room in the old index, try to allocate another one! new_index(); localIndex = index.load(std::memory_order_relaxed); newTail = (localIndex->tail.load(std::memory_order_relaxed) + 1) & (localIndex->capacity - 1); idxEntry = localIndex->index[newTail]; assert(idxEntry->key.load(std::memory_order_relaxed) == INVALID_KEY); idxEntry->key.store(key, std::memory_order_relaxed); localIndex->tail.store(newTail, std::memory_order_release); } inline IndexEntry* get_entry_for_key(std::uint64_t key) const { auto localIndex = index.load(std::memory_order_acquire); auto tail = localIndex->tail.load(std::memory_order_acquire); auto tailBase = localIndex->index[tail]->key.load(std::memory_order_relaxed); if (tailBase == INVALID_KEY) { return nullptr; } auto offset = static_cast(key - tailBase); std::size_t idx = (tail + offset) & (localIndex->capacity - 1); auto entry = localIndex->index[idx]; return entry->key.load(std::memory_order_relaxed) == key ? entry : nullptr; } bool new_index() { auto prev = index.load(std::memory_order_relaxed); std::size_t prevCapacity = prev == nullptr ? 0 : prev->capacity; auto entryCount = prev == nullptr ? nextCapacity : prevCapacity; auto raw = static_cast(corealgos_allocator::malloc( sizeof(IndexHeader) + std::alignment_of::value - 1 + sizeof(IndexEntry) * entryCount + std::alignment_of::value - 1 + sizeof(IndexEntry*) * nextCapacity)); if (raw == nullptr) { return false; } auto header = new (raw) IndexHeader; auto entries = reinterpret_cast(details::align_for(raw + sizeof(IndexHeader))); auto idx = reinterpret_cast(details::align_for(reinterpret_cast(entries) + sizeof(IndexEntry) * entryCount)); if (prev != nullptr) { auto prevTail = prev->tail.load(std::memory_order_relaxed); auto prevPos = prevTail; std::size_t i = 0; do { prevPos = (prevPos + 1) & (prev->capacity - 1); idx[i++] = prev->index[prevPos]; } while (prevPos != prevTail); assert(i == prevCapacity); } for (std::size_t i = 0; i != entryCount; ++i) { new (entries + i) IndexEntry; entries[i].key.store(INVALID_KEY, std::memory_order_relaxed); entries[i].value.store(nullptr, std::memory_order_relaxed); idx[prevCapacity + i] = entries + i; } header->prev = prev; header->entries = entries; header->index = idx; header->capacity = nextCapacity; header->tail.store((prevCapacity - 1) & (nextCapacity - 1), std::memory_order_relaxed); index.store(header, std::memory_order_release); nextCapacity <<= 1; return true; } private: std::size_t nextCapacity; std::atomic index; static const std::uint64_t INVALID_KEY = ~(std::uint64_t)0; }; } }