21 #ifndef SBIT_MPSC_H_INCLUDED
22 #define SBIT_MPSC_H_INCLUDED
29 #include <memory_resource>
71 next = head->load(std::memory_order_relaxed);
73 while (!std::atomic_compare_exchange_weak_explicit(
74 head, &
next,
this, std::memory_order_release, std::memory_order_relaxed))
92 template <
typename Payload>
124 Envelope* values = std::atomic_exchange_explicit(&m_head,
nullptr, std::memory_order_release);
132 std::atomic<Envelope*> m_head{
nullptr};
141 template <
typename Payload>
153 MessagePool(
size_t allocationGroupSize, std::pmr::memory_resource* memoryResource) :
154 m_allocationGroupSize{allocationGroupSize},
155 m_recyclePool{
nullptr},
156 m_objectPool{allocationGroupSize, memoryResource}
166 if (m_readyPool ==
nullptr)
168 m_readyPool = std::atomic_exchange_explicit(&m_recyclePool,
nullptr, std::memory_order_release);
171 if (m_readyPool ==
nullptr)
175 auto& elems = m_objectPool.emplace_back(
176 std::pmr::vector<Message>{m_allocationGroupSize, m_objectPool.get_allocator()});
177 for (
auto& msg : elems)
179 msg.envelope.next = m_readyPool;
180 m_readyPool = &msg.envelope;
182 msg.envelope.ownerPool = &m_recyclePool;
191 auto* msg =
reinterpret_cast<Message*
>(m_readyPool);
192 m_readyPool = m_readyPool->
next;
197 const size_t m_allocationGroupSize;
202 std::atomic<Envelope*> m_recyclePool;
206 Envelope* m_readyPool =
nullptr;
210 std::pmr::list<std::pmr::vector<Message>> m_objectPool;
255 while (!m_done.load(std::memory_order_acquire))
257 auto* envelope = m_queue.
flushAll();
259 if (envelope ==
nullptr)
265 while (envelope !=
nullptr)
269 auto* next = envelope->next;
282 m_done.store(
true, std::memory_order_release);
295 std::atomic<bool> m_done{
false};
304 template <
typename Payload>
322 virtual void process(
const Payload& payload) = 0;
329 auto* message =
reinterpret_cast<Message*
>(envelope);
338 #endif // SBIT_MPSC_H_INCLUDED