nix-archive-1(type directoryentry(nameincludenode(type directoryentry(name atomic_queuenode(type directoryentry(nameatomic_queue.hnode(typeregularcontents-_/* -*- mode: c++; c-basic-offset: 4; indent-tabs-mode: nil; tab-width: 4 -*- */ #ifndef ATOMIC_QUEUE_ATOMIC_QUEUE_H_INCLUDED #define ATOMIC_QUEUE_ATOMIC_QUEUE_H_INCLUDED // Copyright (c) 2019 Maxim Egorushkin. MIT License. See the full licence in file LICENSE. #include "defs.h" #include #include #include #include #include #include #include //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// namespace atomic_queue { using std::uint32_t; using std::uint64_t; using std::uint8_t; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// namespace details { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// template struct GetCacheLineIndexBits { static int constexpr value = 0; }; template<> struct GetCacheLineIndexBits<256> { static int constexpr value = 8; }; template<> struct GetCacheLineIndexBits<128> { static int constexpr value = 7; }; template<> struct GetCacheLineIndexBits< 64> { static int constexpr value = 6; }; template<> struct GetCacheLineIndexBits< 32> { static int constexpr value = 5; }; template<> struct GetCacheLineIndexBits< 16> { static int constexpr value = 4; }; template<> struct GetCacheLineIndexBits< 8> { static int constexpr value = 3; }; template<> struct GetCacheLineIndexBits< 4> { static int constexpr value = 2; }; template<> struct GetCacheLineIndexBits< 2> { static int constexpr value = 1; }; template struct GetIndexShuffleBits { static int constexpr bits = GetCacheLineIndexBits::value; static unsigned constexpr min_size = 1u << (bits * 2); static int constexpr value = array_size < min_size ? 0 : bits; }; template struct GetIndexShuffleBits { static int constexpr value = 0; }; // Multiple writers/readers contend on the same cache line when storing/loading elements at // subsequent indexes, aka false sharing. For power of 2 ring buffer size it is possible to re-map // the index in such a way that each subsequent element resides on another cache line, which // minimizes contention. This is done by swapping the lowest order N bits (which are the index of // the element within the cache line) with the next N bits (which are the index of the cache line) // of the element index. template constexpr unsigned remap_index_with_mix(unsigned index, unsigned mix) { return index ^ mix ^ (mix << BITS); } template constexpr unsigned remap_index(unsigned index) noexcept { return remap_index_with_mix(index, (index ^ (index >> BITS)) & ((1u << BITS) - 1)); } template<> constexpr unsigned remap_index<0>(unsigned index) noexcept { return index; } template constexpr T& map(T* elements, unsigned index) noexcept { return elements[remap_index(index)]; } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Implement a "bit-twiddling hack" for finding the next power of 2 in either 32 bits or 64 bits // in C++11 compatible constexpr functions. The library no longer maintains C++11 compatibility. // "Runtime" version for 32 bits // --a; // a |= a >> 1; // a |= a >> 2; // a |= a >> 4; // a |= a >> 8; // a |= a >> 16; // ++a; template constexpr T decrement(T x) noexcept { return x - 1; } template constexpr T increment(T x) noexcept { return x + 1; } template constexpr T or_equal(T x, unsigned u) noexcept { return x | x >> u; } template constexpr T or_equal(T x, unsigned u, Args... rest) noexcept { return or_equal(or_equal(x, u), rest...); } constexpr uint32_t round_up_to_power_of_2(uint32_t a) noexcept { return increment(or_equal(decrement(a), 1, 2, 4, 8, 16)); } constexpr uint64_t round_up_to_power_of_2(uint64_t a) noexcept { return increment(or_equal(decrement(a), 1, 2, 4, 8, 16, 32)); } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// } // namespace details //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// template class AtomicQueueCommon { protected: // Put these on different cache lines to avoid false sharing between readers and writers. alignas(CACHE_LINE_SIZE) std::atomic head_ = {}; alignas(CACHE_LINE_SIZE) std::atomic tail_ = {}; // The special member functions are not thread-safe. AtomicQueueCommon() noexcept = default; AtomicQueueCommon(AtomicQueueCommon const& b) noexcept : head_(b.head_.load(X)) , tail_(b.tail_.load(X)) {} AtomicQueueCommon& operator=(AtomicQueueCommon const& b) noexcept { head_.store(b.head_.load(X), X); tail_.store(b.tail_.load(X), X); return *this; } void swap(AtomicQueueCommon& b) noexcept { unsigned h = head_.load(X); unsigned t = tail_.load(X); head_.store(b.head_.load(X), X); tail_.store(b.tail_.load(X), X); b.head_.store(h, X); b.tail_.store(t, X); } template static T do_pop_atomic(std::atomic& q_element) noexcept { if(Derived::spsc_) { for(;;) { T element = q_element.load(X); if(ATOMIC_QUEUE_LIKELY(element != NIL)) { q_element.store(NIL, R); return element; } if(Derived::maximize_throughput_) spin_loop_pause(); } } else { for(;;) { T element = q_element.exchange(NIL, R); // (2) The store to wait for. if(ATOMIC_QUEUE_LIKELY(element != NIL)) return element; // Do speculative loads while busy-waiting to avoid broadcasting RFO messages. do spin_loop_pause(); while(Derived::maximize_throughput_ && q_element.load(X) == NIL); } } } template static void do_push_atomic(T element, std::atomic& q_element) noexcept { assert(element != NIL); if(Derived::spsc_) { while(ATOMIC_QUEUE_UNLIKELY(q_element.load(X) != NIL)) if(Derived::maximize_throughput_) spin_loop_pause(); q_element.store(element, R); } else { for(T expected = NIL; ATOMIC_QUEUE_UNLIKELY(!q_element.compare_exchange_strong(expected, element, R, X)); expected = NIL) { do spin_loop_pause(); // (1) Wait for store (2) to complete. while(Derived::maximize_throughput_ && q_element.load(X) != NIL); } } } enum State : unsigned char { EMPTY, STORING, STORED, LOADING }; template static T do_pop_any(std::atomic& state, T& q_element) noexcept { if(Derived::spsc_) { while(ATOMIC_QUEUE_UNLIKELY(state.load(A) != STORED)) if(Derived::maximize_throughput_) spin_loop_pause(); T element{std::move(q_element)}; state.store(EMPTY, R); return element; } else { for(;;) { unsigned char expected = STORED; if(ATOMIC_QUEUE_LIKELY(state.compare_exchange_strong(expected, LOADING, A, X))) { T element{std::move(q_element)}; state.store(EMPTY, R); return element; } // Do speculative loads while busy-waiting to avoid broadcasting RFO messages. do spin_loop_pause(); while(Derived::maximize_throughput_ && state.load(X) != STORED); } } } template static void do_push_any(U&& element, std::atomic& state, T& q_element) noexcept { if(Derived::spsc_) { while(ATOMIC_QUEUE_UNLIKELY(state.load(A) != EMPTY)) if(Derived::maximize_throughput_) spin_loop_pause(); q_element = std::forward(element); state.store(STORED, R); } else { for(;;) { unsigned char expected = EMPTY; if(ATOMIC_QUEUE_LIKELY(state.compare_exchange_strong(expected, STORING, A, X))) { q_element = std::forward(element); state.store(STORED, R); return; } // Do speculative loads while busy-waiting to avoid broadcasting RFO messages. do spin_loop_pause(); while(Derived::maximize_throughput_ && state.load(X) != EMPTY); } } } public: template bool try_push(T&& element) noexcept { auto head = head_.load(X); if(Derived::spsc_) { if(static_cast(head - tail_.load(X)) >= static_cast(static_cast(*this).size_)) return false; head_.store(head + 1, X); } else { do { if(static_cast(head - tail_.load(X)) >= static_cast(static_cast(*this).size_)) return false; } while(ATOMIC_QUEUE_UNLIKELY(!head_.compare_exchange_strong(head, head + 1, A, X))); // This loop is not FIFO. } static_cast(*this).do_push(std::forward(element), head); return true; } template bool try_pop(T& element) noexcept { auto tail = tail_.load(X); if(Derived::spsc_) { if(static_cast(head_.load(X) - tail) <= 0) return false; tail_.store(tail + 1, X); } else { do { if(static_cast(head_.load(X) - tail) <= 0) return false; } while(ATOMIC_QUEUE_UNLIKELY(!tail_.compare_exchange_strong(tail, tail + 1, A, X))); // This loop is not FIFO. } element = static_cast(*this).do_pop(tail); return true; } template void push(T&& element) noexcept { unsigned head; if(Derived::spsc_) { head = head_.load(X); head_.store(head + 1, X); } else { constexpr auto memory_order = Derived::total_order_ ? std::memory_order_seq_cst : std::memory_order_acquire; head = head_.fetch_add(1, memory_order); // FIFO and total order on Intel regardless, as of 2019. } static_cast(*this).do_push(std::forward(element), head); } auto pop() noexcept { unsigned tail; if(Derived::spsc_) { tail = tail_.load(X); tail_.store(tail + 1, X); } else { constexpr auto memory_order = Derived::total_order_ ? std::memory_order_seq_cst : std::memory_order_acquire; tail = tail_.fetch_add(1, memory_order); // FIFO and total order on Intel regardless, as of 2019. } return static_cast(*this).do_pop(tail); } bool was_empty() const noexcept { return !was_size(); } bool was_full() const noexcept { return was_size() >= static_cast(static_cast(*this).size_); } unsigned was_size() const noexcept { // tail_ can be greater than head_ because of consumers doing pop, rather that try_pop, when the queue is empty. return std::max(static_cast(head_.load(X) - tail_.load(X)), 0); } unsigned capacity() const noexcept { return static_cast(*this).size_; } }; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// template class AtomicQueue : public AtomicQueueCommon> { using Base = AtomicQueueCommon>; friend Base; static constexpr unsigned size_ = MINIMIZE_CONTENTION ? details::round_up_to_power_of_2(SIZE) : SIZE; static constexpr int SHUFFLE_BITS = details::GetIndexShuffleBits)>::value; static constexpr bool total_order_ = TOTAL_ORDER; static constexpr bool spsc_ = SPSC; static constexpr bool maximize_throughput_ = MAXIMIZE_THROUGHPUT; alignas(CACHE_LINE_SIZE) std::atomic elements_[size_] = {}; // Empty elements are NIL. T do_pop(unsigned tail) noexcept { std::atomic& q_element = details::map(elements_, tail % size_); return Base::template do_pop_atomic(q_element); } void do_push(T element, unsigned head) noexcept { std::atomic& q_element = details::map(elements_, head % size_); Base::template do_push_atomic(element, q_element); } public: using value_type = T; AtomicQueue() noexcept { assert(std::atomic{NIL}.is_lock_free()); // This queue is for atomic elements only. AtomicQueue2 is for non-atomic ones. if(T{} != NIL) for(auto& element : elements_) element.store(NIL, X); } AtomicQueue(AtomicQueue const&) = delete; AtomicQueue& operator=(AtomicQueue const&) = delete; }; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// template class AtomicQueue2 : public AtomicQueueCommon> { using Base = AtomicQueueCommon>; using State = typename Base::State; friend Base; static constexpr unsigned size_ = MINIMIZE_CONTENTION ? details::round_up_to_power_of_2(SIZE) : SIZE; static constexpr int SHUFFLE_BITS = details::GetIndexShuffleBits::value; static constexpr bool total_order_ = TOTAL_ORDER; static constexpr bool spsc_ = SPSC; static constexpr bool maximize_throughput_ = MAXIMIZE_THROUGHPUT; alignas(CACHE_LINE_SIZE) std::atomic states_[size_] = {}; alignas(CACHE_LINE_SIZE) T elements_[size_] = {}; T do_pop(unsigned tail) noexcept { unsigned index = details::remap_index(tail % size_); return Base::template do_pop_any(states_[index], elements_[index]); } template void do_push(U&& element, unsigned head) noexcept { unsigned index = details::remap_index(head % size_); Base::template do_push_any(std::forward(element), states_[index], elements_[index]); } public: using value_type = T; AtomicQueue2() noexcept = default; AtomicQueue2(AtomicQueue2 const&) = delete; AtomicQueue2& operator=(AtomicQueue2 const&) = delete; }; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// template, T NIL = T{}, bool MAXIMIZE_THROUGHPUT = true, bool TOTAL_ORDER = false, bool SPSC = false> class AtomicQueueB : public AtomicQueueCommon>, private std::allocator_traits::template rebind_alloc> { using Base = AtomicQueueCommon>; friend Base; static constexpr bool total_order_ = TOTAL_ORDER; static constexpr bool spsc_ = SPSC; static constexpr bool maximize_throughput_ = MAXIMIZE_THROUGHPUT; using AllocatorElements = typename std::allocator_traits::template rebind_alloc>; static constexpr auto ELEMENTS_PER_CACHE_LINE = CACHE_LINE_SIZE / sizeof(std::atomic); static_assert(ELEMENTS_PER_CACHE_LINE, "Unexpected ELEMENTS_PER_CACHE_LINE."); static constexpr auto SHUFFLE_BITS = details::GetCacheLineIndexBits::value; static_assert(SHUFFLE_BITS, "Unexpected SHUFFLE_BITS."); // AtomicQueueCommon members are stored into by readers and writers. // Allocate these immutable members on another cache line which never gets invalidated by stores. alignas(CACHE_LINE_SIZE) unsigned size_; std::atomic* elements_; T do_pop(unsigned tail) noexcept { std::atomic& q_element = details::map(elements_, tail & (size_ - 1)); return Base::template do_pop_atomic(q_element); } void do_push(T element, unsigned head) noexcept { std::atomic& q_element = details::map(elements_, head & (size_ - 1)); Base::template do_push_atomic(element, q_element); } public: using value_type = T; // The special member functions are not thread-safe. AtomicQueueB(unsigned size) : size_(std::max(details::round_up_to_power_of_2(size), 1u << (SHUFFLE_BITS * 2))) , elements_(AllocatorElements::allocate(size_)) { assert(std::atomic{NIL}.is_lock_free()); // This queue is for atomic elements only. AtomicQueueB2 is for non-atomic ones. for(auto p = elements_, q = elements_ + size_; p < q; ++p) p->store(NIL, X); } AtomicQueueB(AtomicQueueB&& b) noexcept : Base(static_cast(b)) , AllocatorElements(static_cast(b)) // TODO: This must be noexcept, static_assert that. , size_(b.size_) , elements_(b.elements_) { b.size_ = 0; b.elements_ = 0; } AtomicQueueB& operator=(AtomicQueueB&& b) noexcept { b.swap(*this); return *this; } ~AtomicQueueB() noexcept { if(elements_) AllocatorElements::deallocate(elements_, size_); // TODO: This must be noexcept, static_assert that. } void swap(AtomicQueueB& b) noexcept { using std::swap; this->Base::swap(b); swap(static_cast(*this), static_cast(b)); swap(size_, b.size_); swap(elements_, b.elements_); } friend void swap(AtomicQueueB& a, AtomicQueueB& b) { a.swap(b); } }; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// template, bool MAXIMIZE_THROUGHPUT = true, bool TOTAL_ORDER = false, bool SPSC = false> class AtomicQueueB2 : public AtomicQueueCommon>, private A, private std::allocator_traits::template rebind_alloc> { using Base = AtomicQueueCommon>; using State = typename Base::State; friend Base; static constexpr bool total_order_ = TOTAL_ORDER; static constexpr bool spsc_ = SPSC; static constexpr bool maximize_throughput_ = MAXIMIZE_THROUGHPUT; using AllocatorElements = A; using AllocatorStates = typename std::allocator_traits::template rebind_alloc>; // AtomicQueueCommon members are stored into by readers and writers. // Allocate these immutable members on another cache line which never gets invalidated by stores. alignas(CACHE_LINE_SIZE) unsigned size_; std::atomic* states_; T* elements_; static constexpr auto STATES_PER_CACHE_LINE = CACHE_LINE_SIZE / sizeof(State); static_assert(STATES_PER_CACHE_LINE, "Unexpected STATES_PER_CACHE_LINE."); static constexpr auto SHUFFLE_BITS = details::GetCacheLineIndexBits::value; static_assert(SHUFFLE_BITS, "Unexpected SHUFFLE_BITS."); T do_pop(unsigned tail) noexcept { unsigned index = details::remap_index(tail & (size_ - 1)); return Base::template do_pop_any(states_[index], elements_[index]); } template void do_push(U&& element, unsigned head) noexcept { unsigned index = details::remap_index(head & (size_ - 1)); Base::template do_push_any(std::forward(element), states_[index], elements_[index]); } public: using value_type = T; // The special member functions are not thread-safe. AtomicQueueB2(unsigned size) : size_(std::max(details::round_up_to_power_of_2(size), 1u << (SHUFFLE_BITS * 2))) , states_(AllocatorStates::allocate(size_)) , elements_(AllocatorElements::allocate(size_)) { for(auto p = states_, q = states_ + size_; p < q; ++p) p->store(Base::EMPTY, X); AllocatorElements& ae = *this; for(auto p = elements_, q = elements_ + size_; p < q; ++p) std::allocator_traits::construct(ae, p); } AtomicQueueB2(AtomicQueueB2&& b) noexcept : Base(static_cast(b)) , AllocatorElements(static_cast(b)) // TODO: This must be noexcept, static_assert that. , AllocatorStates(static_cast(b)) // TODO: This must be noexcept, static_assert that. , size_(b.size_) , states_(b.states_) , elements_(b.elements_) { b.size_ = 0; b.states_ = 0; b.elements_ = 0; } AtomicQueueB2& operator=(AtomicQueueB2&& b) noexcept { b.swap(*this); return *this; } ~AtomicQueueB2() noexcept { if(elements_) { AllocatorElements& ae = *this; for(auto p = elements_, q = elements_ + size_; p < q; ++p) std::allocator_traits::destroy(ae, p); AllocatorElements::deallocate(elements_, size_); // TODO: This must be noexcept, static_assert that. AllocatorStates::deallocate(states_, size_); // TODO: This must be noexcept, static_assert that. } } void swap(AtomicQueueB2& b) noexcept { using std::swap; this->Base::swap(b); swap(static_cast(*this), static_cast(b)); swap(static_cast(*this), static_cast(b)); swap(size_, b.size_); swap(states_, b.states_); swap(elements_, b.elements_); } friend void swap(AtomicQueueB2& a, AtomicQueueB2& b) noexcept { a.swap(b); } }; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// template struct RetryDecorator : Queue { using T = typename Queue::value_type; using Queue::Queue; void push(T element) noexcept { while(!this->try_push(element)) spin_loop_pause(); } T pop() noexcept { T element; while(!this->try_pop(element)) spin_loop_pause(); return element; } }; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// } // namespace atomic_queue //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// #endif // ATOMIC_QUEUE_ATOMIC_QUEUE_H_INCLUDED ))entry(nameatomic_queue_mutex.hnode(typeregularcontents° /* -*- mode: c++; c-basic-offset: 4; indent-tabs-mode: nil; tab-width: 4 -*- */ #ifndef ATOMIC_QUEUE_ATOMIC_QUEUE_SPIN_LOCK_H_INCLUDED #define ATOMIC_QUEUE_ATOMIC_QUEUE_SPIN_LOCK_H_INCLUDED // Copyright (c) 2019 Maxim Egorushkin. MIT License. See the full licence in file LICENSE. #include "atomic_queue.h" #include "spinlock.h" #include #include //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// namespace atomic_queue { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// template struct ScopedLockType { using type = typename M::scoped_lock; }; template<> struct ScopedLockType { using type = std::unique_lock; }; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// template class AtomicQueueMutexT { static constexpr unsigned size_ = MINIMIZE_CONTENTION ? details::round_up_to_power_of_2(SIZE) : SIZE; Mutex mutex_; alignas(CACHE_LINE_SIZE) unsigned head_ = 0; alignas(CACHE_LINE_SIZE) unsigned tail_ = 0; alignas(CACHE_LINE_SIZE) T q_[size_] = {}; static constexpr int SHUFFLE_BITS = details::GetIndexShuffleBits::value; using ScopedLock = typename ScopedLockType::type; public: using value_type = T; template bool try_push(U&& element) noexcept { ScopedLock lock(mutex_); if(ATOMIC_QUEUE_LIKELY(head_ - tail_ < size_)) { q_[details::remap_index(head_ % size_)] = std::forward(element); ++head_; return true; } return false; } bool try_pop(T& element) noexcept { ScopedLock lock(mutex_); if(ATOMIC_QUEUE_LIKELY(head_ != tail_)) { element = std::move(q_[details::remap_index(tail_ % size_)]); ++tail_; return true; } return false; } bool was_empty() const noexcept { return static_cast(head_ - tail_) <= 0; } bool was_full() const noexcept { return static_cast(head_ - tail_) >= static_cast(size_); } }; template using AtomicQueueMutex = AtomicQueueMutexT; template using AtomicQueueSpinlock = AtomicQueueMutexT; // template // using AtomicQueueSpinlockHle = AtomicQueueMutexT; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// } // namespace atomic_queue //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// #endif // ATOMIC_QUEUE_ATOMIC_QUEUE_SPIN_LOCK_H_INCLUDED ))entry(name barrier.hnode(typeregularcontents)/* -*- mode: c++; c-basic-offset: 4; indent-tabs-mode: nil; tab-width: 4 -*- */ #ifndef BARRIER_H_INCLUDED #define BARRIER_H_INCLUDED // Copyright (c) 2019 Maxim Egorushkin. MIT License. See the full licence in file LICENSE. #include "defs.h" //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// namespace atomic_queue { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// class Barrier { std::atomic counter_ = {}; public: void wait() noexcept { counter_.fetch_add(1, std::memory_order_acquire); while(counter_.load(std::memory_order_relaxed)) spin_loop_pause(); } void release(unsigned expected_counter) noexcept { while(expected_counter != counter_.load(std::memory_order_relaxed)) spin_loop_pause(); counter_.store(0, std::memory_order_release); } }; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// } // namespace atomic_queue //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// #endif // BARRIER_H_INCLUDED ))entry(namedefs.hnode(typeregularcontentsí /* -*- mode: c++; c-basic-offset: 4; indent-tabs-mode: nil; tab-width: 4 -*- */ #ifndef ATOMIC_QUEUE_DEFS_H_INCLUDED #define ATOMIC_QUEUE_DEFS_H_INCLUDED // Copyright (c) 2019 Maxim Egorushkin. MIT License. See the full licence in file LICENSE. #include #if defined(__x86_64__) || defined(_M_X64) || defined(__i386__) || defined(_M_IX86) #include namespace atomic_queue { constexpr int CACHE_LINE_SIZE = 64; static inline void spin_loop_pause() noexcept { _mm_pause(); } } // namespace atomic_queue #elif defined(__arm__) || defined(__aarch64__) namespace atomic_queue { constexpr int CACHE_LINE_SIZE = 64; static inline void spin_loop_pause() noexcept { #if (defined(__ARM_ARCH_6K__) || \ defined(__ARM_ARCH_6Z__) || \ defined(__ARM_ARCH_6ZK__) || \ defined(__ARM_ARCH_6T2__) || \ defined(__ARM_ARCH_7__) || \ defined(__ARM_ARCH_7A__) || \ defined(__ARM_ARCH_7R__) || \ defined(__ARM_ARCH_7M__) || \ defined(__ARM_ARCH_7S__) || \ defined(__ARM_ARCH_8A__) || \ defined(__aarch64__)) asm volatile ("yield" ::: "memory"); #else asm volatile ("nop" ::: "memory"); #endif } } // namespace atomic_queue #elif defined(__ppc64__) || defined(__powerpc64__) namespace atomic_queue { constexpr int CACHE_LINE_SIZE = 128; // TODO: Review that this is the correct value. static inline void spin_loop_pause() noexcept { asm volatile("or 31,31,31 # very low priority"); // TODO: Review and benchmark that this is the right instruction. } } // namespace atomic_queue #elif defined(__s390x__) namespace atomic_queue { constexpr int CACHE_LINE_SIZE = 256; // TODO: Review that this is the correct value. static inline void spin_loop_pause() noexcept {} // TODO: Find the right instruction to use here, if any. } // namespace atomic_queue #else #warning "Unknown CPU architecture. Using L1 cache line size of 64 bytes and no spinloop pause instruction." namespace atomic_queue { constexpr int CACHE_LINE_SIZE = 64; // TODO: Review that this is the correct value. static inline void spin_loop_pause() noexcept {} } // namespace atomic_queue #endif //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// namespace atomic_queue { #if defined(__GNUC__) || defined(__clang__) #define ATOMIC_QUEUE_LIKELY(expr) __builtin_expect(static_cast(expr), 1) #define ATOMIC_QUEUE_UNLIKELY(expr) __builtin_expect(static_cast(expr), 0) #define ATOMIC_QUEUE_NOINLINE __attribute__((noinline)) #else #define ATOMIC_QUEUE_LIKELY(expr) (expr) #define ATOMIC_QUEUE_UNLIKELY(expr) (expr) #define ATOMIC_QUEUE_NOINLINE #endif //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// auto constexpr A = std::memory_order_acquire; auto constexpr R = std::memory_order_release; auto constexpr X = std::memory_order_relaxed; auto constexpr C = std::memory_order_seq_cst; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// } // namespace atomic_queue //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// #endif // ATOMIC_QUEUE_DEFS_H_INCLUDED ))entry(name spinlock.hnode(typeregularcontentsŇ/* -*- mode: c++; c-basic-offset: 4; indent-tabs-mode: nil; tab-width: 4 -*- */ #ifndef ATOMIC_QUEUE_SPIN_LOCK_H_INCLUDED #define ATOMIC_QUEUE_SPIN_LOCK_H_INCLUDED // Copyright (c) 2019 Maxim Egorushkin. MIT License. See the full licence in file LICENSE. #include "defs.h" #include #include #include #include //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// namespace atomic_queue { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// class Spinlock { pthread_spinlock_t s_; public: using scoped_lock = std::lock_guard; Spinlock() noexcept { if(ATOMIC_QUEUE_UNLIKELY(::pthread_spin_init(&s_, 0))) std::abort(); } Spinlock(Spinlock const&) = delete; Spinlock& operator=(Spinlock const&) = delete; ~Spinlock() noexcept { ::pthread_spin_destroy(&s_); } void lock() noexcept { if(ATOMIC_QUEUE_UNLIKELY(::pthread_spin_lock(&s_))) std::abort(); } void unlock() noexcept { if(ATOMIC_QUEUE_UNLIKELY(::pthread_spin_unlock(&s_))) std::abort(); } }; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// class TicketSpinlock { alignas(CACHE_LINE_SIZE) std::atomic ticket_{0}; alignas(CACHE_LINE_SIZE) std::atomic next_{0}; public: class LockGuard { TicketSpinlock* const m_; unsigned const ticket_; public: LockGuard(TicketSpinlock& m) noexcept : m_(&m) , ticket_(m.lock()) {} LockGuard(LockGuard const&) = delete; LockGuard& operator=(LockGuard const&) = delete; ~LockGuard() noexcept { m_->unlock(ticket_); } }; using scoped_lock = LockGuard; TicketSpinlock() noexcept = default; TicketSpinlock(TicketSpinlock const&) = delete; TicketSpinlock& operator=(TicketSpinlock const&) = delete; ATOMIC_QUEUE_NOINLINE unsigned lock() noexcept { auto ticket = ticket_.fetch_add(1, std::memory_order_relaxed); for(;;) { auto position = ticket - next_.load(std::memory_order_acquire); if(ATOMIC_QUEUE_LIKELY(!position)) break; do spin_loop_pause(); while(--position); } return ticket; } void unlock() noexcept { unlock(next_.load(std::memory_order_relaxed) + 1); } void unlock(unsigned ticket) noexcept { next_.store(ticket + 1, std::memory_order_release); } }; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// class UnfairSpinlock { std::atomic lock_{0}; public: using scoped_lock = std::lock_guard; UnfairSpinlock(UnfairSpinlock const&) = delete; UnfairSpinlock& operator=(UnfairSpinlock const&) = delete; void lock() noexcept { for(;;) { if(!lock_.load(std::memory_order_relaxed) && !lock_.exchange(1, std::memory_order_acquire)) return; spin_loop_pause(); } } void unlock() noexcept { lock_.store(0, std::memory_order_release); } }; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // class SpinlockHle { // int lock_ = 0; // #ifdef __gcc__ // static constexpr int HLE_ACQUIRE = __ATOMIC_HLE_ACQUIRE; // static constexpr int HLE_RELEASE = __ATOMIC_HLE_RELEASE; // #else // static constexpr int HLE_ACQUIRE = 0; // static constexpr int HLE_RELEASE = 0; // #endif // public: // using scoped_lock = std::lock_guard; // SpinlockHle(SpinlockHle const&) = delete; // SpinlockHle& operator=(SpinlockHle const&) = delete; // void lock() noexcept { // for(int expected = 0; // !__atomic_compare_exchange_n(&lock_, &expected, 1, false, __ATOMIC_ACQUIRE | HLE_ACQUIRE, __ATOMIC_RELAXED); // expected = 0) // spin_loop_pause(); // } // void unlock() noexcept { // __atomic_store_n(&lock_, 0, __ATOMIC_RELEASE | HLE_RELEASE); // } // }; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // class AdaptiveMutex { // pthread_mutex_t m_; // public: // using scoped_lock = std::lock_guard; // AdaptiveMutex() noexcept { // pthread_mutexattr_t a; // if(ATOMIC_QUEUE_UNLIKELY(::pthread_mutexattr_init(&a))) // std::abort(); // if(ATOMIC_QUEUE_UNLIKELY(::pthread_mutexattr_settype(&a, PTHREAD_MUTEX_ADAPTIVE_NP))) // std::abort(); // if(ATOMIC_QUEUE_UNLIKELY(::pthread_mutex_init(&m_, &a))) // std::abort(); // if(ATOMIC_QUEUE_UNLIKELY(::pthread_mutexattr_destroy(&a))) // std::abort(); // m_.__data.__spins = 32767; // } // AdaptiveMutex(AdaptiveMutex const&) = delete; // AdaptiveMutex& operator=(AdaptiveMutex const&) = delete; // ~AdaptiveMutex() noexcept { // if(ATOMIC_QUEUE_UNLIKELY(::pthread_mutex_destroy(&m_))) // std::abort(); // } // void lock() noexcept { // if(ATOMIC_QUEUE_UNLIKELY(::pthread_mutex_lock(&m_))) // std::abort(); // } // void unlock() noexcept { // if(ATOMIC_QUEUE_UNLIKELY(::pthread_mutex_unlock(&m_))) // std::abort(); // } // }; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// } // namespace atomic_queue //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// #endif // ATOMIC_QUEUE_SPIN_LOCK_H_INCLUDED ))))))entry(namesharenode(type directoryentry(namedocnode(type directoryentry(nameatomic-queue-1.0node(type directoryentry(nameLICENSEnode(typeregularcontents1MIT License Copyright (c) 2019 Maxim Egorushkin Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. )))))))))