Signbit MPSCQ  0.5.0
mpscq.h
Go to the documentation of this file.
1 
7 /*
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  */
20 
21 #ifndef SBIT_MPSC_H_INCLUDED
22 #define SBIT_MPSC_H_INCLUDED
23 
24 #include <sbit/stats.h>
25 
26 #include <atomic>
27 #include <cstddef>
28 #include <list>
29 #include <memory_resource>
30 #include <vector>
31 
32 namespace sbit::mpscq
33 {
34 
49 class Queue
50 {
51 public:
54  struct Envelope
55  {
58 
60  std::atomic<Envelope*>* workerPool;
61 
63  std::atomic<Envelope*>* ownerPool;
64 
69  void appendTo(std::atomic<Envelope*>* head)
70  {
71  next = head->load(std::memory_order_relaxed);
72 
73  while (!std::atomic_compare_exchange_weak_explicit(
74  head, &next, this, std::memory_order_release, std::memory_order_relaxed))
75  {
76  // the body of the loop is empty
77  }
78  }
79 
82  void recycle()
83  {
85  }
86  };
87 
92  template <typename Payload>
93  struct Message
94  {
97 
99  Payload payload;
100 
103  void recycle()
104  {
105  envelope.recycle();
106  }
107  };
108 
113  void append(Envelope* elem)
114  {
115  elem->appendTo(&m_head);
116  }
117 
123  {
124  Envelope* values = std::atomic_exchange_explicit(&m_head, nullptr, std::memory_order_release);
125 
126  // TODO(florin): reverse the links so we return a pointer to the oldest element
127 
128  return values;
129  }
130 
131 private:
132  std::atomic<Envelope*> m_head{nullptr};
133 };
134 
141 template <typename Payload>
143 {
144  using Envelope = Queue::Envelope;
146 
147 public:
153  MessagePool(size_t allocationGroupSize, std::pmr::memory_resource* memoryResource) :
154  m_allocationGroupSize{allocationGroupSize},
155  m_recyclePool{nullptr},
156  m_objectPool{allocationGroupSize, memoryResource}
157  {
158  }
159 
165  {
166  if (m_readyPool == nullptr)
167  {
168  m_readyPool = std::atomic_exchange_explicit(&m_recyclePool, nullptr, std::memory_order_release);
169  }
170 
171  if (m_readyPool == nullptr)
172  {
173  try
174  {
175  auto& elems = m_objectPool.emplace_back(
176  std::pmr::vector<Message>{m_allocationGroupSize, m_objectPool.get_allocator()});
177  for (auto& msg : elems)
178  {
179  msg.envelope.next = m_readyPool;
180  m_readyPool = &msg.envelope;
181  msg.envelope.workerPool = &m_recyclePool;
182  msg.envelope.ownerPool = &m_recyclePool;
183  }
184  }
185  catch (...)
186  {
187  return nullptr;
188  }
189  }
190 
191  auto* msg = reinterpret_cast<Message*>(m_readyPool);
192  m_readyPool = m_readyPool->next;
193  return msg;
194  }
195 
196 private:
197  const size_t m_allocationGroupSize;
198 
202  std::atomic<Envelope*> m_recyclePool;
203 
206  Envelope* m_readyPool = nullptr;
207 
210  std::pmr::list<std::pmr::vector<Message>> m_objectPool;
211 };
212 
216 {
217 public:
222  explicit ProcessorBase(Queue& queue) : m_queue(queue)
223  {
224  }
225 
231  virtual void processElement(Queue::Envelope* envelope) = 0;
232 
237  virtual void afterBatch()
238  {
239  // do nothing
240  }
241 
246  virtual void onIdle()
247  {
248  // do nothing
249  }
250 
254  {
255  while (!m_done.load(std::memory_order_acquire))
256  {
257  auto* envelope = m_queue.flushAll();
258 
259  if (envelope == nullptr)
260  {
261  onIdle();
262  continue;
263  }
264 
265  while (envelope != nullptr)
266  {
267  processElement(envelope);
268 
269  auto* next = envelope->next;
270  envelope->recycle();
271  envelope = next;
272  }
273 
274  afterBatch();
275  }
276  }
277 
280  void interrupt()
281  {
282  m_done.store(true, std::memory_order_release);
283  }
284 
285  ProcessorBase(const ProcessorBase& other) = delete;
286  ProcessorBase& operator=(const ProcessorBase& other) = delete;
287 
288  ProcessorBase(ProcessorBase&& other) = delete;
289  ProcessorBase& operator=(ProcessorBase&& other) = delete;
290 
291  virtual ~ProcessorBase() = default;
292 
293 private:
294  Queue& m_queue;
295  std::atomic<bool> m_done{false};
296 };
297 
304 template <typename Payload>
305 class Processor : public ProcessorBase
306 {
307 public:
312  explicit Processor(Queue& queue) : ProcessorBase(queue)
313  {
314  }
315 
316 protected:
322  virtual void process(const Payload& payload) = 0;
323 
324 private:
325  using Message = Queue::Message<Payload>;
326 
327  void processElement(Queue::Envelope* envelope) override
328  {
329  auto* message = reinterpret_cast<Message*>(envelope);
330  process(message->payload);
331  }
332 };
333 
336 } // namespace sbit::mpscq
337 
338 #endif // SBIT_MPSC_H_INCLUDED
sbit::mpscq::Queue::Envelope::ownerPool
std::atomic< Envelope * > * ownerPool
Where to return the object when the worker is shut down.
Definition: mpscq.h:63
sbit::mpscq::Queue::Envelope::next
Envelope * next
Next element in the queue.
Definition: mpscq.h:57
sbit::mpscq::ProcessorBase::interrupt
void interrupt()
Definition: mpscq.h:280
sbit::mpscq::ProcessorBase
Definition: mpscq.h:215
sbit::mpscq::Queue::append
void append(Envelope *elem)
Definition: mpscq.h:113
sbit::mpscq::Queue::Envelope::appendTo
void appendTo(std::atomic< Envelope * > *head)
Definition: mpscq.h:69
sbit::mpscq::Queue::Message::envelope
Envelope envelope
The envelope.
Definition: mpscq.h:96
sbit::mpscq::MessagePool::allocate
Message * allocate()
Definition: mpscq.h:164
sbit::mpscq::Processor::Processor
Processor(Queue &queue)
Definition: mpscq.h:312
sbit::mpscq::ProcessorBase::startProcessing
void startProcessing()
Definition: mpscq.h:253
sbit::mpscq::ProcessorBase::onIdle
virtual void onIdle()
Definition: mpscq.h:246
sbit::mpscq::ProcessorBase::afterBatch
virtual void afterBatch()
Definition: mpscq.h:237
sbit::mpscq::ProcessorBase::ProcessorBase
ProcessorBase(Queue &queue)
Definition: mpscq.h:222
sbit::mpscq::Queue
Definition: mpscq.h:49
sbit::mpscq::Processor::process
virtual void process(const Payload &payload)=0
stats.h
Running statistics.
sbit::mpscq::MessagePool
Definition: mpscq.h:142
sbit::mpscq::Queue::Envelope::recycle
void recycle()
Definition: mpscq.h:82
sbit::mpscq::Queue::Message
Definition: mpscq.h:93
sbit::mpscq::Queue::flushAll
Envelope * flushAll()
Definition: mpscq.h:122
sbit::mpscq::Processor
Definition: mpscq.h:305
sbit::mpscq::MessagePool::MessagePool
MessagePool(size_t allocationGroupSize, std::pmr::memory_resource *memoryResource)
Definition: mpscq.h:153
sbit::mpscq::Queue::Envelope
Definition: mpscq.h:54
sbit::mpscq::Queue::Envelope::workerPool
std::atomic< Envelope * > * workerPool
Where to return the object after the message is processed.
Definition: mpscq.h:60
sbit::mpscq::Queue::Message::payload
Payload payload
The payload.
Definition: mpscq.h:99
sbit::mpscq::Queue::Message::recycle
void recycle()
Definition: mpscq.h:103
sbit::mpscq::ProcessorBase::processElement
virtual void processElement(Queue::Envelope *envelope)=0