From c4314b231ff6379d0b834e06b819a0a0108f320f Mon Sep 17 00:00:00 2001 From: Morph <39850852+Morph1984@users.noreply.github.com> Date: Sun, 19 Mar 2023 03:19:25 -0400 Subject: [PATCH 1/6] bounded_threadsafe_queue: Use simplified impl of bounded queue Provides a simplified SPSC, MPSC, and MPMC bounded queue implementation using mutexes. --- src/common/bounded_threadsafe_queue.h | 337 ++++++++++++++++---------- src/video_core/gpu_thread.cpp | 7 +- 2 files changed, 216 insertions(+), 128 deletions(-) diff --git a/src/common/bounded_threadsafe_queue.h b/src/common/bounded_threadsafe_queue.h index 14e887c705..e034275396 100644 --- a/src/common/bounded_threadsafe_queue.h +++ b/src/common/bounded_threadsafe_queue.h @@ -1,159 +1,246 @@ -// SPDX-FileCopyrightText: Copyright (c) 2020 Erik Rigtorp -// SPDX-License-Identifier: MIT +// SPDX-FileCopyrightText: Copyright 2023 yuzu Emulator Project +// SPDX-License-Identifier: GPL-2.0-or-later #pragma once #include -#include #include -#include +#include #include #include -#include -#include #include "common/polyfill_thread.h" namespace Common { -#if defined(__cpp_lib_hardware_interference_size) -constexpr size_t hardware_interference_size = std::hardware_destructive_interference_size; -#else -constexpr size_t hardware_interference_size = 64; -#endif +namespace detail { +constexpr size_t DefaultCapacity = 0x1000; +} // namespace detail + +template +class SPSCQueue { + static_assert((Capacity & (Capacity - 1)) == 0, "Capacity must be a power of two."); -template -class MPSCQueue { public: - explicit MPSCQueue() : allocator{std::allocator>()} { - // Allocate one extra slot to prevent false sharing on the last slot - slots = allocator.allocate(capacity + 1); - // Allocators are not required to honor alignment for over-aligned types - // (see http://eel.is/c++draft/allocator.requirements#10) so we verify - // alignment here - if (reinterpret_cast(slots) % alignof(Slot) != 0) { - allocator.deallocate(slots, capacity + 1); - throw std::bad_alloc(); - } - for (size_t i = 0; i < capacity; ++i) { - std::construct_at(&slots[i]); - } - static_assert(std::has_single_bit(capacity), "capacity must be an integer power of 2"); - static_assert(alignof(Slot) == hardware_interference_size, - "Slot must be aligned to cache line boundary to prevent false sharing"); - static_assert(sizeof(Slot) % hardware_interference_size == 0, - "Slot size must be a multiple of cache line size to prevent " - "false sharing between adjacent slots"); - static_assert(sizeof(MPSCQueue) % hardware_interference_size == 0, - "Queue size must be a multiple of cache line size to " - "prevent false sharing between adjacent queues"); - } + void Push(T&& t) { + const size_t write_index = m_write_index.load(); - ~MPSCQueue() noexcept { - for (size_t i = 0; i < capacity; ++i) { - std::destroy_at(&slots[i]); - } - allocator.deallocate(slots, capacity + 1); - } - - // The queue must be both non-copyable and non-movable - MPSCQueue(const MPSCQueue&) = delete; - MPSCQueue& operator=(const MPSCQueue&) = delete; - - MPSCQueue(MPSCQueue&&) = delete; - MPSCQueue& operator=(MPSCQueue&&) = delete; - - void Push(const T& v) noexcept { - static_assert(std::is_nothrow_copy_constructible_v, - "T must be nothrow copy constructible"); - emplace(v); - } - - template >> - void Push(P&& v) noexcept { - emplace(std::forward

(v)); - } - - void Pop(T& v, std::stop_token stop) noexcept { - auto const tail = tail_.fetch_add(1); - auto& slot = slots[idx(tail)]; - if (!slot.turn.test()) { - std::unique_lock lock{cv_mutex}; - Common::CondvarWait(cv, lock, stop, [&slot] { return slot.turn.test(); }); - } - v = slot.move(); - slot.destroy(); - slot.turn.clear(); - slot.turn.notify_one(); - } - -private: - template - struct Slot { - ~Slot() noexcept { - if (turn.test()) { - destroy(); - } + // Wait until we have free slots to write to. + while ((write_index - m_read_index.load()) == Capacity) { + std::this_thread::yield(); } - template - void construct(Args&&... args) noexcept { - static_assert(std::is_nothrow_constructible_v, - "T must be nothrow constructible with Args&&..."); - std::construct_at(reinterpret_cast(&storage), std::forward(args)...); - } + // Determine the position to write to. + const size_t pos = write_index % Capacity; - void destroy() noexcept { - static_assert(std::is_nothrow_destructible_v, "T must be nothrow destructible"); - std::destroy_at(reinterpret_cast(&storage)); - } + // Push into the queue. + m_data[pos] = std::move(t); - U&& move() noexcept { - return reinterpret_cast(storage); - } + // Increment the write index. + ++m_write_index; - // Align to avoid false sharing between adjacent slots - alignas(hardware_interference_size) std::atomic_flag turn{}; - struct aligned_store { - struct type { - alignas(U) unsigned char data[sizeof(U)]; - }; - }; - typename aligned_store::type storage; - }; - - template - void emplace(Args&&... args) noexcept { - static_assert(std::is_nothrow_constructible_v, - "T must be nothrow constructible with Args&&..."); - auto const head = head_.fetch_add(1); - auto& slot = slots[idx(head)]; - slot.turn.wait(true); - slot.construct(std::forward(args)...); - slot.turn.test_and_set(); + // Notify the consumer that we have pushed into the queue. + std::scoped_lock lock{cv_mutex}; cv.notify_one(); } - constexpr size_t idx(size_t i) const noexcept { - return i & mask; + template + void Push(Args&&... args) { + const size_t write_index = m_write_index.load(); + + // Wait until we have free slots to write to. + while ((write_index - m_read_index.load()) == Capacity) { + std::this_thread::yield(); + } + + // Determine the position to write to. + const size_t pos = write_index % Capacity; + + // Emplace into the queue. + std::construct_at(std::addressof(m_data[pos]), std::forward(args)...); + + // Increment the write index. + ++m_write_index; + + // Notify the consumer that we have pushed into the queue. + std::scoped_lock lock{cv_mutex}; + cv.notify_one(); } - static constexpr size_t mask = capacity - 1; + bool TryPop(T& t) { + return Pop(t); + } - // Align to avoid false sharing between head_ and tail_ - alignas(hardware_interference_size) std::atomic head_{0}; - alignas(hardware_interference_size) std::atomic tail_{0}; + void PopWait(T& t, std::stop_token stop_token) { + Wait(stop_token); + Pop(t); + } + + T PopWait(std::stop_token stop_token) { + Wait(stop_token); + T t; + Pop(t); + return t; + } + + void Clear() { + while (!Empty()) { + Pop(); + } + } + + bool Empty() const { + return m_read_index.load() == m_write_index.load(); + } + + size_t Size() const { + return m_write_index.load() - m_read_index.load(); + } + +private: + void Pop() { + const size_t read_index = m_read_index.load(); + + // Check if the queue is empty. + if (read_index == m_write_index.load()) { + return; + } + + // Determine the position to read from. + const size_t pos = read_index % Capacity; + + // Pop the data off the queue, deleting it. + std::destroy_at(std::addressof(m_data[pos])); + + // Increment the read index. + ++m_read_index; + } + + bool Pop(T& t) { + const size_t read_index = m_read_index.load(); + + // Check if the queue is empty. + if (read_index == m_write_index.load()) { + return false; + } + + // Determine the position to read from. + const size_t pos = read_index % Capacity; + + // Pop the data off the queue, moving it. + t = std::move(m_data[pos]); + + // Increment the read index. + ++m_read_index; + + return true; + } + + void Wait(std::stop_token stop_token) { + std::unique_lock lock{cv_mutex}; + Common::CondvarWait(cv, lock, stop_token, [this] { return !Empty(); }); + } + + alignas(128) std::atomic_size_t m_read_index{0}; + alignas(128) std::atomic_size_t m_write_index{0}; + + std::array m_data; - std::mutex cv_mutex; std::condition_variable_any cv; + std::mutex cv_mutex; +}; - Slot* slots; - [[no_unique_address]] std::allocator> allocator; +template +class MPSCQueue { +public: + void Push(T&& t) { + std::scoped_lock lock{write_mutex}; + spsc_queue.Push(std::move(t)); + } - static_assert(std::is_nothrow_copy_assignable_v || std::is_nothrow_move_assignable_v, - "T must be nothrow copy or move assignable"); + template + void Push(Args&&... args) { + std::scoped_lock lock{write_mutex}; + spsc_queue.Push(std::forward(args)...); + } - static_assert(std::is_nothrow_destructible_v, "T must be nothrow destructible"); + bool TryPop(T& t) { + return spsc_queue.TryPop(t); + } + + void PopWait(T& t, std::stop_token stop_token) { + spsc_queue.PopWait(t, stop_token); + } + + T PopWait(std::stop_token stop_token) { + return spsc_queue.PopWait(stop_token); + } + + void Clear() { + spsc_queue.Clear(); + } + + bool Empty() { + return spsc_queue.Empty(); + } + + size_t Size() { + return spsc_queue.Size(); + } + +private: + SPSCQueue spsc_queue; + std::mutex write_mutex; +}; + +template +class MPMCQueue { +public: + void Push(T&& t) { + std::scoped_lock lock{write_mutex}; + spsc_queue.Push(std::move(t)); + } + + template + void Push(Args&&... args) { + std::scoped_lock lock{write_mutex}; + spsc_queue.Push(std::forward(args)...); + } + + bool TryPop(T& t) { + std::scoped_lock lock{read_mutex}; + return spsc_queue.TryPop(t); + } + + void PopWait(T& t, std::stop_token stop_token) { + std::scoped_lock lock{read_mutex}; + spsc_queue.PopWait(t, stop_token); + } + + T PopWait(std::stop_token stop_token) { + std::scoped_lock lock{read_mutex}; + return spsc_queue.PopWait(stop_token); + } + + void Clear() { + std::scoped_lock lock{read_mutex}; + spsc_queue.Clear(); + } + + bool Empty() { + std::scoped_lock lock{read_mutex}; + return spsc_queue.Empty(); + } + + size_t Size() { + std::scoped_lock lock{read_mutex}; + return spsc_queue.Size(); + } + +private: + SPSCQueue spsc_queue; + std::mutex write_mutex; + std::mutex read_mutex; }; } // namespace Common diff --git a/src/video_core/gpu_thread.cpp b/src/video_core/gpu_thread.cpp index f52f9e28fa..469a59cf97 100644 --- a/src/video_core/gpu_thread.cpp +++ b/src/video_core/gpu_thread.cpp @@ -31,9 +31,10 @@ static void RunThread(std::stop_token stop_token, Core::System& system, auto current_context = context.Acquire(); VideoCore::RasterizerInterface* const rasterizer = renderer.ReadRasterizer(); + CommandDataContainer next; + while (!stop_token.stop_requested()) { - CommandDataContainer next; - state.queue.Pop(next, stop_token); + state.queue.PopWait(next, stop_token); if (stop_token.stop_requested()) { break; } @@ -117,7 +118,7 @@ u64 ThreadManager::PushCommand(CommandData&& command_data, bool block) { std::unique_lock lk(state.write_lock); const u64 fence{++state.last_fence}; - state.queue.Push(CommandDataContainer(std::move(command_data), fence, block)); + state.queue.Push(std::move(command_data), fence, block); if (block) { Common::CondvarWait(state.cv, lk, thread.get_stop_token(), [this, fence] { From 623cbd908e628081312c20091be0fda3b39653d9 Mon Sep 17 00:00:00 2001 From: Morph <39850852+Morph1984@users.noreply.github.com> Date: Sun, 19 Mar 2023 03:20:05 -0400 Subject: [PATCH 2/6] logging: Make use of bounded queue --- src/common/logging/backend.cpp | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/common/logging/backend.cpp b/src/common/logging/backend.cpp index 2a3bded40e..e1ce9db993 100644 --- a/src/common/logging/backend.cpp +++ b/src/common/logging/backend.cpp @@ -28,7 +28,7 @@ #ifdef _WIN32 #include "common/string_util.h" #endif -#include "common/threadsafe_queue.h" +#include "common/bounded_threadsafe_queue.h" namespace Common::Log { @@ -204,11 +204,11 @@ public: void PushEntry(Class log_class, Level log_level, const char* filename, unsigned int line_num, const char* function, std::string&& message) { - if (!filter.CheckMessage(log_class, log_level)) + if (!filter.CheckMessage(log_class, log_level)) { return; - const Entry& entry = - CreateEntry(log_class, log_level, filename, line_num, function, std::move(message)); - message_queue.Push(entry); + } + message_queue.Push( + CreateEntry(log_class, log_level, filename, line_num, function, std::move(message))); } private: @@ -225,7 +225,7 @@ private: ForEachBackend([&entry](Backend& backend) { backend.Write(entry); }); }; while (!stop_token.stop_requested()) { - entry = message_queue.PopWait(stop_token); + message_queue.PopWait(entry, stop_token); if (entry.filename != nullptr) { write_logs(); } @@ -233,7 +233,7 @@ private: // Drain the logging queue. Only writes out up to MAX_LOGS_TO_WRITE to prevent a // case where a system is repeatedly spamming logs even on close. int max_logs_to_write = filter.IsDebug() ? INT_MAX : 100; - while (max_logs_to_write-- && message_queue.Pop(entry)) { + while (max_logs_to_write-- && message_queue.TryPop(entry)) { write_logs(); } }); @@ -273,7 +273,7 @@ private: ColorConsoleBackend color_console_backend{}; FileBackend file_backend; - MPSCQueue message_queue{}; + MPSCQueue message_queue{}; std::chrono::steady_clock::time_point time_origin{std::chrono::steady_clock::now()}; std::jthread backend_thread; }; From 281e99f09d18b27c4db5fc5cd86c003233f0b543 Mon Sep 17 00:00:00 2001 From: Morph <39850852+Morph1984@users.noreply.github.com> Date: Sun, 19 Mar 2023 04:01:47 -0400 Subject: [PATCH 3/6] bounded_threadsafe_queue: Add TryPush --- src/common/bounded_threadsafe_queue.h | 71 +++++++++++++++++++++++++++ 1 file changed, 71 insertions(+) diff --git a/src/common/bounded_threadsafe_queue.h b/src/common/bounded_threadsafe_queue.h index e034275396..eb88cc1d1b 100644 --- a/src/common/bounded_threadsafe_queue.h +++ b/src/common/bounded_threadsafe_queue.h @@ -22,6 +22,55 @@ class SPSCQueue { static_assert((Capacity & (Capacity - 1)) == 0, "Capacity must be a power of two."); public: + bool TryPush(T&& t) { + const size_t write_index = m_write_index.load(); + + // Check if we have free slots to write to. + if ((write_index - m_read_index.load()) == Capacity) { + return false; + } + + // Determine the position to write to. + const size_t pos = write_index % Capacity; + + // Push into the queue. + m_data[pos] = std::move(t); + + // Increment the write index. + ++m_write_index; + + // Notify the consumer that we have pushed into the queue. + std::scoped_lock lock{cv_mutex}; + cv.notify_one(); + + return true; + } + + template + bool TryPush(Args&&... args) { + const size_t write_index = m_write_index.load(); + + // Check if we have free slots to write to. + if ((write_index - m_read_index.load()) == Capacity) { + return false; + } + + // Determine the position to write to. + const size_t pos = write_index % Capacity; + + // Emplace into the queue. + std::construct_at(std::addressof(m_data[pos]), std::forward(args)...); + + // Increment the write index. + ++m_write_index; + + // Notify the consumer that we have pushed into the queue. + std::scoped_lock lock{cv_mutex}; + cv.notify_one(); + + return true; + } + void Push(T&& t) { const size_t write_index = m_write_index.load(); @@ -153,6 +202,17 @@ private: template class MPSCQueue { public: + bool TryPush(T&& t) { + std::scoped_lock lock{write_mutex}; + return spsc_queue.TryPush(std::move(t)); + } + + template + bool TryPush(Args&&... args) { + std::scoped_lock lock{write_mutex}; + return spsc_queue.TryPush(std::forward(args)...); + } + void Push(T&& t) { std::scoped_lock lock{write_mutex}; spsc_queue.Push(std::move(t)); @@ -196,6 +256,17 @@ private: template class MPMCQueue { public: + bool TryPush(T&& t) { + std::scoped_lock lock{write_mutex}; + return spsc_queue.TryPush(std::move(t)); + } + + template + bool TryPush(Args&&... args) { + std::scoped_lock lock{write_mutex}; + return spsc_queue.TryPush(std::forward(args)...); + } + void Push(T&& t) { std::scoped_lock lock{write_mutex}; spsc_queue.Push(std::move(t)); From 62fd55e5fe23772fd254552cc8273ca2a75f4eb3 Mon Sep 17 00:00:00 2001 From: Morph <39850852+Morph1984@users.noreply.github.com> Date: Sun, 19 Mar 2023 14:24:18 -0400 Subject: [PATCH 4/6] bounded_threadsafe_queue: Deduplicate and add PushModes Adds the PushModes Try and Wait to allow producers to specify how they want to push their data to the queue if the queue is full. If the queue is full: - Try will fail to push to the queue, returning false. Try only returns true if it successfully pushes to the queue. This may result in items not being pushed into the queue. - Wait will wait until a slot is available to push to the queue, resulting in potential for deadlock if a consumer is not running. --- src/common/bounded_threadsafe_queue.h | 188 +++++++++++++------------- src/common/logging/backend.cpp | 2 +- src/video_core/gpu_thread.cpp | 2 +- 3 files changed, 95 insertions(+), 97 deletions(-) diff --git a/src/common/bounded_threadsafe_queue.h b/src/common/bounded_threadsafe_queue.h index eb88cc1d1b..975215863f 100644 --- a/src/common/bounded_threadsafe_queue.h +++ b/src/common/bounded_threadsafe_queue.h @@ -23,97 +23,21 @@ class SPSCQueue { public: bool TryPush(T&& t) { - const size_t write_index = m_write_index.load(); - - // Check if we have free slots to write to. - if ((write_index - m_read_index.load()) == Capacity) { - return false; - } - - // Determine the position to write to. - const size_t pos = write_index % Capacity; - - // Push into the queue. - m_data[pos] = std::move(t); - - // Increment the write index. - ++m_write_index; - - // Notify the consumer that we have pushed into the queue. - std::scoped_lock lock{cv_mutex}; - cv.notify_one(); - - return true; + return Push(std::move(t)); } template - bool TryPush(Args&&... args) { - const size_t write_index = m_write_index.load(); - - // Check if we have free slots to write to. - if ((write_index - m_read_index.load()) == Capacity) { - return false; - } - - // Determine the position to write to. - const size_t pos = write_index % Capacity; - - // Emplace into the queue. - std::construct_at(std::addressof(m_data[pos]), std::forward(args)...); - - // Increment the write index. - ++m_write_index; - - // Notify the consumer that we have pushed into the queue. - std::scoped_lock lock{cv_mutex}; - cv.notify_one(); - - return true; + bool TryEmplace(Args&&... args) { + return Emplace(std::forward(args)...); } - void Push(T&& t) { - const size_t write_index = m_write_index.load(); - - // Wait until we have free slots to write to. - while ((write_index - m_read_index.load()) == Capacity) { - std::this_thread::yield(); - } - - // Determine the position to write to. - const size_t pos = write_index % Capacity; - - // Push into the queue. - m_data[pos] = std::move(t); - - // Increment the write index. - ++m_write_index; - - // Notify the consumer that we have pushed into the queue. - std::scoped_lock lock{cv_mutex}; - cv.notify_one(); + void PushWait(T&& t) { + Push(std::move(t)); } template - void Push(Args&&... args) { - const size_t write_index = m_write_index.load(); - - // Wait until we have free slots to write to. - while ((write_index - m_read_index.load()) == Capacity) { - std::this_thread::yield(); - } - - // Determine the position to write to. - const size_t pos = write_index % Capacity; - - // Emplace into the queue. - std::construct_at(std::addressof(m_data[pos]), std::forward(args)...); - - // Increment the write index. - ++m_write_index; - - // Notify the consumer that we have pushed into the queue. - std::scoped_lock lock{cv_mutex}; - cv.notify_one(); + void EmplaceWait(Args&&... args) { + Emplace(std::forward(args)...); } bool TryPop(T& t) { @@ -147,6 +71,80 @@ public: } private: + enum class PushMode { + Try, + Wait, + Count, + }; + + template + bool Push(T&& t) { + const size_t write_index = m_write_index.load(); + + if constexpr (Mode == PushMode::Try) { + // Check if we have free slots to write to. + if ((write_index - m_read_index.load()) == Capacity) { + return false; + } + } else if constexpr (Mode == PushMode::Wait) { + // Wait until we have free slots to write to. + while ((write_index - m_read_index.load()) == Capacity) { + std::this_thread::yield(); + } + } else { + static_assert(Mode < PushMode::Count, "Invalid PushMode."); + } + + // Determine the position to write to. + const size_t pos = write_index % Capacity; + + // Push into the queue. + m_data[pos] = std::move(t); + + // Increment the write index. + ++m_write_index; + + // Notify the consumer that we have pushed into the queue. + std::scoped_lock lock{cv_mutex}; + cv.notify_one(); + + return true; + } + + template + bool Emplace(Args&&... args) { + const size_t write_index = m_write_index.load(); + + if constexpr (Mode == PushMode::Try) { + // Check if we have free slots to write to. + if ((write_index - m_read_index.load()) == Capacity) { + return false; + } + } else if constexpr (Mode == PushMode::Wait) { + // Wait until we have free slots to write to. + while ((write_index - m_read_index.load()) == Capacity) { + std::this_thread::yield(); + } + } else { + static_assert(Mode < PushMode::Count, "Invalid PushMode."); + } + + // Determine the position to write to. + const size_t pos = write_index % Capacity; + + // Emplace into the queue. + std::construct_at(std::addressof(m_data[pos]), std::forward(args)...); + + // Increment the write index. + ++m_write_index; + + // Notify the consumer that we have pushed into the queue. + std::scoped_lock lock{cv_mutex}; + cv.notify_one(); + + return true; + } + void Pop() { const size_t read_index = m_read_index.load(); @@ -208,20 +206,20 @@ public: } template - bool TryPush(Args&&... args) { + bool TryEmplace(Args&&... args) { std::scoped_lock lock{write_mutex}; - return spsc_queue.TryPush(std::forward(args)...); + return spsc_queue.TryEmplace(std::forward(args)...); } - void Push(T&& t) { + void PushWait(T&& t) { std::scoped_lock lock{write_mutex}; - spsc_queue.Push(std::move(t)); + spsc_queue.PushWait(std::move(t)); } template - void Push(Args&&... args) { + void EmplaceWait(Args&&... args) { std::scoped_lock lock{write_mutex}; - spsc_queue.Push(std::forward(args)...); + spsc_queue.EmplaceWait(std::forward(args)...); } bool TryPop(T& t) { @@ -262,20 +260,20 @@ public: } template - bool TryPush(Args&&... args) { + bool TryEmplace(Args&&... args) { std::scoped_lock lock{write_mutex}; - return spsc_queue.TryPush(std::forward(args)...); + return spsc_queue.TryEmplace(std::forward(args)...); } - void Push(T&& t) { + void PushWait(T&& t) { std::scoped_lock lock{write_mutex}; - spsc_queue.Push(std::move(t)); + spsc_queue.PushWait(std::move(t)); } template - void Push(Args&&... args) { + void EmplaceWait(Args&&... args) { std::scoped_lock lock{write_mutex}; - spsc_queue.Push(std::forward(args)...); + spsc_queue.EmplaceWait(std::forward(args)...); } bool TryPop(T& t) { diff --git a/src/common/logging/backend.cpp b/src/common/logging/backend.cpp index e1ce9db993..f96c7c2221 100644 --- a/src/common/logging/backend.cpp +++ b/src/common/logging/backend.cpp @@ -207,7 +207,7 @@ public: if (!filter.CheckMessage(log_class, log_level)) { return; } - message_queue.Push( + message_queue.EmplaceWait( CreateEntry(log_class, log_level, filename, line_num, function, std::move(message))); } diff --git a/src/video_core/gpu_thread.cpp b/src/video_core/gpu_thread.cpp index 469a59cf97..3c5317777f 100644 --- a/src/video_core/gpu_thread.cpp +++ b/src/video_core/gpu_thread.cpp @@ -118,7 +118,7 @@ u64 ThreadManager::PushCommand(CommandData&& command_data, bool block) { std::unique_lock lk(state.write_lock); const u64 fence{++state.last_fence}; - state.queue.Push(std::move(command_data), fence, block); + state.queue.EmplaceWait(std::move(command_data), fence, block); if (block) { Common::CondvarWait(state.cv, lk, thread.get_stop_token(), [this, fence] { From 70b1c2e8e09baf7bc1f67c347de367acf4dd28b2 Mon Sep 17 00:00:00 2001 From: Morph <39850852+Morph1984@users.noreply.github.com> Date: Sun, 19 Mar 2023 14:48:01 -0400 Subject: [PATCH 5/6] bounded_threadsafe_queue: Add producer cv to avoid busy waiting --- src/common/bounded_threadsafe_queue.h | 46 +++++++++++++++++---------- 1 file changed, 29 insertions(+), 17 deletions(-) diff --git a/src/common/bounded_threadsafe_queue.h b/src/common/bounded_threadsafe_queue.h index 975215863f..0fb2f42d18 100644 --- a/src/common/bounded_threadsafe_queue.h +++ b/src/common/bounded_threadsafe_queue.h @@ -45,12 +45,12 @@ public: } void PopWait(T& t, std::stop_token stop_token) { - Wait(stop_token); + ConsumerWait(stop_token); Pop(t); } T PopWait(std::stop_token stop_token) { - Wait(stop_token); + ConsumerWait(stop_token); T t; Pop(t); return t; @@ -88,9 +88,10 @@ private: } } else if constexpr (Mode == PushMode::Wait) { // Wait until we have free slots to write to. - while ((write_index - m_read_index.load()) == Capacity) { - std::this_thread::yield(); - } + std::unique_lock lock{producer_cv_mutex}; + producer_cv.wait(lock, [this, write_index] { + return (write_index - m_read_index.load()) < Capacity; + }); } else { static_assert(Mode < PushMode::Count, "Invalid PushMode."); } @@ -105,8 +106,8 @@ private: ++m_write_index; // Notify the consumer that we have pushed into the queue. - std::scoped_lock lock{cv_mutex}; - cv.notify_one(); + std::scoped_lock lock{consumer_cv_mutex}; + consumer_cv.notify_one(); return true; } @@ -122,9 +123,10 @@ private: } } else if constexpr (Mode == PushMode::Wait) { // Wait until we have free slots to write to. - while ((write_index - m_read_index.load()) == Capacity) { - std::this_thread::yield(); - } + std::unique_lock lock{producer_cv_mutex}; + producer_cv.wait(lock, [this, write_index] { + return (write_index - m_read_index.load()) < Capacity; + }); } else { static_assert(Mode < PushMode::Count, "Invalid PushMode."); } @@ -139,8 +141,8 @@ private: ++m_write_index; // Notify the consumer that we have pushed into the queue. - std::scoped_lock lock{cv_mutex}; - cv.notify_one(); + std::scoped_lock lock{consumer_cv_mutex}; + consumer_cv.notify_one(); return true; } @@ -161,6 +163,10 @@ private: // Increment the read index. ++m_read_index; + + // Notify the producer that we have popped off the queue. + std::unique_lock lock{producer_cv_mutex}; + producer_cv.notify_one(); } bool Pop(T& t) { @@ -180,12 +186,16 @@ private: // Increment the read index. ++m_read_index; + // Notify the producer that we have popped off the queue. + std::scoped_lock lock{producer_cv_mutex}; + producer_cv.notify_one(); + return true; } - void Wait(std::stop_token stop_token) { - std::unique_lock lock{cv_mutex}; - Common::CondvarWait(cv, lock, stop_token, [this] { return !Empty(); }); + void ConsumerWait(std::stop_token stop_token) { + std::unique_lock lock{consumer_cv_mutex}; + Common::CondvarWait(consumer_cv, lock, stop_token, [this] { return !Empty(); }); } alignas(128) std::atomic_size_t m_read_index{0}; @@ -193,8 +203,10 @@ private: std::array m_data; - std::condition_variable_any cv; - std::mutex cv_mutex; + std::condition_variable_any producer_cv; + std::mutex producer_cv_mutex; + std::condition_variable_any consumer_cv; + std::mutex consumer_cv_mutex; }; template From 05ba40b98e6e82d8a4b903ded215caa647e81006 Mon Sep 17 00:00:00 2001 From: Morph <39850852+Morph1984@users.noreply.github.com> Date: Sun, 19 Mar 2023 15:17:21 -0400 Subject: [PATCH 6/6] bounded_threadsafe_queue: Refactor Pop Introduces PopModes to bring waiting logic into Pop, similar to Push. --- src/common/bounded_threadsafe_queue.h | 206 ++++++++------------------ 1 file changed, 64 insertions(+), 142 deletions(-) diff --git a/src/common/bounded_threadsafe_queue.h b/src/common/bounded_threadsafe_queue.h index 0fb2f42d18..bd87aa09b5 100644 --- a/src/common/bounded_threadsafe_queue.h +++ b/src/common/bounded_threadsafe_queue.h @@ -22,52 +22,38 @@ class SPSCQueue { static_assert((Capacity & (Capacity - 1)) == 0, "Capacity must be a power of two."); public: - bool TryPush(T&& t) { - return Push(std::move(t)); - } - template bool TryEmplace(Args&&... args) { return Emplace(std::forward(args)...); } - void PushWait(T&& t) { - Push(std::move(t)); - } - template void EmplaceWait(Args&&... args) { Emplace(std::forward(args)...); } bool TryPop(T& t) { - return Pop(t); + return Pop(t); + } + + void PopWait(T& t) { + Pop(t); } void PopWait(T& t, std::stop_token stop_token) { - ConsumerWait(stop_token); - Pop(t); + Pop(t, stop_token); } - T PopWait(std::stop_token stop_token) { - ConsumerWait(stop_token); + T PopWait() { T t; - Pop(t); + Pop(t); return t; } - void Clear() { - while (!Empty()) { - Pop(); - } - } - - bool Empty() const { - return m_read_index.load() == m_write_index.load(); - } - - size_t Size() const { - return m_write_index.load() - m_read_index.load(); + T PopWait(std::stop_token stop_token) { + T t; + Pop(t, stop_token); + return t; } private: @@ -77,55 +63,27 @@ private: Count, }; - template - bool Push(T&& t) { - const size_t write_index = m_write_index.load(); - - if constexpr (Mode == PushMode::Try) { - // Check if we have free slots to write to. - if ((write_index - m_read_index.load()) == Capacity) { - return false; - } - } else if constexpr (Mode == PushMode::Wait) { - // Wait until we have free slots to write to. - std::unique_lock lock{producer_cv_mutex}; - producer_cv.wait(lock, [this, write_index] { - return (write_index - m_read_index.load()) < Capacity; - }); - } else { - static_assert(Mode < PushMode::Count, "Invalid PushMode."); - } - - // Determine the position to write to. - const size_t pos = write_index % Capacity; - - // Push into the queue. - m_data[pos] = std::move(t); - - // Increment the write index. - ++m_write_index; - - // Notify the consumer that we have pushed into the queue. - std::scoped_lock lock{consumer_cv_mutex}; - consumer_cv.notify_one(); - - return true; - } + enum class PopMode { + Try, + Wait, + WaitWithStopToken, + Count, + }; template bool Emplace(Args&&... args) { - const size_t write_index = m_write_index.load(); + const size_t write_index = m_write_index.load(std::memory_order::relaxed); if constexpr (Mode == PushMode::Try) { // Check if we have free slots to write to. - if ((write_index - m_read_index.load()) == Capacity) { + if ((write_index - m_read_index.load(std::memory_order::acquire)) == Capacity) { return false; } } else if constexpr (Mode == PushMode::Wait) { // Wait until we have free slots to write to. std::unique_lock lock{producer_cv_mutex}; producer_cv.wait(lock, [this, write_index] { - return (write_index - m_read_index.load()) < Capacity; + return (write_index - m_read_index.load(std::memory_order::acquire)) < Capacity; }); } else { static_assert(Mode < PushMode::Count, "Invalid PushMode."); @@ -147,34 +105,32 @@ private: return true; } - void Pop() { - const size_t read_index = m_read_index.load(); + template + bool Pop(T& t, [[maybe_unused]] std::stop_token stop_token = {}) { + const size_t read_index = m_read_index.load(std::memory_order::relaxed); - // Check if the queue is empty. - if (read_index == m_write_index.load()) { - return; - } - - // Determine the position to read from. - const size_t pos = read_index % Capacity; - - // Pop the data off the queue, deleting it. - std::destroy_at(std::addressof(m_data[pos])); - - // Increment the read index. - ++m_read_index; - - // Notify the producer that we have popped off the queue. - std::unique_lock lock{producer_cv_mutex}; - producer_cv.notify_one(); - } - - bool Pop(T& t) { - const size_t read_index = m_read_index.load(); - - // Check if the queue is empty. - if (read_index == m_write_index.load()) { - return false; + if constexpr (Mode == PopMode::Try) { + // Check if the queue is empty. + if (read_index == m_write_index.load(std::memory_order::acquire)) { + return false; + } + } else if constexpr (Mode == PopMode::Wait) { + // Wait until the queue is not empty. + std::unique_lock lock{consumer_cv_mutex}; + consumer_cv.wait(lock, [this, read_index] { + return read_index != m_write_index.load(std::memory_order::acquire); + }); + } else if constexpr (Mode == PopMode::WaitWithStopToken) { + // Wait until the queue is not empty. + std::unique_lock lock{consumer_cv_mutex}; + Common::CondvarWait(consumer_cv, lock, stop_token, [this, read_index] { + return read_index != m_write_index.load(std::memory_order::acquire); + }); + if (stop_token.stop_requested()) { + return false; + } + } else { + static_assert(Mode < PopMode::Count, "Invalid PopMode."); } // Determine the position to read from. @@ -193,11 +149,6 @@ private: return true; } - void ConsumerWait(std::stop_token stop_token) { - std::unique_lock lock{consumer_cv_mutex}; - Common::CondvarWait(consumer_cv, lock, stop_token, [this] { return !Empty(); }); - } - alignas(128) std::atomic_size_t m_read_index{0}; alignas(128) std::atomic_size_t m_write_index{0}; @@ -212,22 +163,12 @@ private: template class MPSCQueue { public: - bool TryPush(T&& t) { - std::scoped_lock lock{write_mutex}; - return spsc_queue.TryPush(std::move(t)); - } - template bool TryEmplace(Args&&... args) { std::scoped_lock lock{write_mutex}; return spsc_queue.TryEmplace(std::forward(args)...); } - void PushWait(T&& t) { - std::scoped_lock lock{write_mutex}; - spsc_queue.PushWait(std::move(t)); - } - template void EmplaceWait(Args&&... args) { std::scoped_lock lock{write_mutex}; @@ -238,26 +179,22 @@ public: return spsc_queue.TryPop(t); } + void PopWait(T& t) { + spsc_queue.PopWait(t); + } + void PopWait(T& t, std::stop_token stop_token) { spsc_queue.PopWait(t, stop_token); } + T PopWait() { + return spsc_queue.PopWait(); + } + T PopWait(std::stop_token stop_token) { return spsc_queue.PopWait(stop_token); } - void Clear() { - spsc_queue.Clear(); - } - - bool Empty() { - return spsc_queue.Empty(); - } - - size_t Size() { - return spsc_queue.Size(); - } - private: SPSCQueue spsc_queue; std::mutex write_mutex; @@ -266,22 +203,12 @@ private: template class MPMCQueue { public: - bool TryPush(T&& t) { - std::scoped_lock lock{write_mutex}; - return spsc_queue.TryPush(std::move(t)); - } - template bool TryEmplace(Args&&... args) { std::scoped_lock lock{write_mutex}; return spsc_queue.TryEmplace(std::forward(args)...); } - void PushWait(T&& t) { - std::scoped_lock lock{write_mutex}; - spsc_queue.PushWait(std::move(t)); - } - template void EmplaceWait(Args&&... args) { std::scoped_lock lock{write_mutex}; @@ -293,31 +220,26 @@ public: return spsc_queue.TryPop(t); } + void PopWait(T& t) { + std::scoped_lock lock{read_mutex}; + spsc_queue.PopWait(t); + } + void PopWait(T& t, std::stop_token stop_token) { std::scoped_lock lock{read_mutex}; spsc_queue.PopWait(t, stop_token); } + T PopWait() { + std::scoped_lock lock{read_mutex}; + return spsc_queue.PopWait(); + } + T PopWait(std::stop_token stop_token) { std::scoped_lock lock{read_mutex}; return spsc_queue.PopWait(stop_token); } - void Clear() { - std::scoped_lock lock{read_mutex}; - spsc_queue.Clear(); - } - - bool Empty() { - std::scoped_lock lock{read_mutex}; - return spsc_queue.Empty(); - } - - size_t Size() { - std::scoped_lock lock{read_mutex}; - return spsc_queue.Size(); - } - private: SPSCQueue spsc_queue; std::mutex write_mutex;