PMDK C++ bindings 1.13.0
This is the C++ bindings documentation for PMDK's libpmemobj.
mpsc_queue.hpp
Go to the documentation of this file.
1// SPDX-License-Identifier: BSD-3-Clause
2/* Copyright 2021, Intel Corporation */
3
9#ifndef LIBPMEMOBJ_MPSC_QUEUE_HPP
10#define LIBPMEMOBJ_MPSC_QUEUE_HPP
11
14#include <libpmemobj++/detail/ringbuf.hpp>
19
20#include <atomic>
21#include <cstddef>
22#include <cstring>
23#include <iterator>
24#include <memory>
25
26namespace pmem
27{
28
29namespace obj
30{
31
32namespace experimental
33{
34
51public:
52 class worker;
53 class pmem_log_type;
54 class batch_type;
55
56 mpsc_queue(pmem_log_type &pmem, size_t max_workers = 1);
57
59
60 template <typename Function>
61 bool try_consume_batch(Function &&f);
62
63private:
64 struct first_block {
65 static constexpr size_t CAPACITY =
66 pmem::detail::CACHELINE_SIZE - sizeof(size_t);
67 static constexpr size_t DIRTY_FLAG =
68 (1ULL << (sizeof(size_t) * 8 - 1));
69
71 char data[CAPACITY];
72 };
73
74 struct iterator {
75 iterator(char *data, char *end);
76
77 iterator &operator++();
78
79 bool operator==(const iterator &rhs) const;
80 bool operator!=(const iterator &rhs) const;
81
82 pmem::obj::string_view operator*() const;
83
84 private:
85 first_block *seek_next(first_block *);
86
87 char *data;
88 char *end;
89 };
90
91 void clear_cachelines(first_block *block, size_t size);
92 void restore_offsets();
93
94 size_t consume_cachelines(size_t *offset);
95 void release_cachelines(size_t len);
96
97 inline pmem::detail::id_manager &get_id_manager();
98
99 /* ringbuf_t handle. Important: mpsc_queue operates on cachelines hence
100 * ringbuf_produce/release functions are called with number of
101 * cachelines, not bytes. */
102 std::unique_ptr<ringbuf::ringbuf_t> ring_buffer;
103 char *buf;
105 size_t buf_size;
107
108 /* Stores offset and length of next message to be consumed. Only
109 * valid if ring_buffer->consume_in_progress. */
110 size_t consume_offset = 0;
111 size_t consume_len = 0;
112
113public:
119 public:
120 batch_type(iterator begin, iterator end);
121
122 iterator begin() const;
123 iterator end() const;
124
125 private:
126 iterator begin_;
127 iterator end_;
128 };
129
141 class worker {
142 public:
143 worker(mpsc_queue *q);
144 ~worker();
145
146 worker(const worker &) = delete;
147 worker &operator=(const worker &) = delete;
148
149 worker(worker &&other);
150 worker &operator=(worker &&other);
151
152 template <typename Function = void (*)(pmem::obj::string_view)>
153 bool try_produce(
155 Function &&on_produce =
156 [](pmem::obj::string_view target) {});
157
158 private:
159 mpsc_queue *queue;
160 ringbuf::ringbuf_worker_t *w;
161 size_t id;
162
163 ptrdiff_t acquire_cachelines(size_t len);
164 void produce_cachelines();
165 void store_to_log(pmem::obj::string_view data, char *log_data);
166
167 friend class mpsc_queue;
168 };
169
181 public:
182 pmem_log_type(size_t size);
183
185
186 private:
188 pmem::obj::p<size_t> written;
189
190 friend class mpsc_queue;
191 };
192};
193
202{
204
205 auto buf_data = pmem.data();
206
207 buf = const_cast<char *>(buf_data.data());
208 buf_size = buf_data.size();
209
210 assert(buf_size % pmem::detail::CACHELINE_SIZE == 0);
211
212 ring_buffer =
213 std::unique_ptr<ringbuf::ringbuf_t>(new ringbuf::ringbuf_t(
214 max_workers, buf_size / pmem::detail::CACHELINE_SIZE));
215
216 this->pmem = &pmem;
217
218 restore_offsets();
219}
220
221ptrdiff_t
222mpsc_queue::worker::acquire_cachelines(size_t len)
223{
224 assert(len % pmem::detail::CACHELINE_SIZE == 0);
225 auto ret = ringbuf_acquire(queue->ring_buffer.get(), w,
226 len / pmem::detail::CACHELINE_SIZE);
227
228 if (ret < 0)
229 return ret;
230
231 return ret * static_cast<ptrdiff_t>(pmem::detail::CACHELINE_SIZE);
232}
233
234void
235mpsc_queue::worker::produce_cachelines()
236{
237 ringbuf_produce(queue->ring_buffer.get(), w);
238}
239
240size_t
241mpsc_queue::consume_cachelines(size_t *offset)
242{
243 auto ret = ringbuf_consume(ring_buffer.get(), offset);
244 if (ret) {
245 *offset *= pmem::detail::CACHELINE_SIZE;
246 return ret * pmem::detail::CACHELINE_SIZE;
247 }
248
249 return 0;
250}
251
252void
253mpsc_queue::release_cachelines(size_t len)
254{
255 assert(len % pmem::detail::CACHELINE_SIZE == 0);
256 ringbuf_release(ring_buffer.get(), len / pmem::detail::CACHELINE_SIZE);
257}
258
259void
260mpsc_queue::restore_offsets()
261{
262 /* Invariant */
263 assert(pmem->written < buf_size);
264
265 /* XXX: implement restore_offset function in ringbuf */
266
267 auto w = register_worker();
268
269 if (!pmem->written) {
270 /* If pmem->written == 0 it means that consumer should start
271 * reading from the beginning. There might be elements produced
272 * anywhere in the log. Since we want to prohibit any producers
273 * from overwriting the original content - mark the entire log
274 * as produced. */
275
276 auto acq = w.acquire_cachelines(buf_size -
277 pmem::detail::CACHELINE_SIZE);
278 assert(acq == 0);
279 (void)acq;
280
281 w.produce_cachelines();
282
283 return;
284 }
285
286 /* If pmem->written != 0 there still might be element in the log.
287 * Moreover, to guarantee proper order of elements on recovery, we must
288 * restore consumer offset. (If we would start consuming from the
289 * beginning of the log, we could consume newer elements first.) Offsets
290 * are restored by following operations:
291 *
292 * produce(pmem->written);
293 * consume();
294 * produce(size - pmem->written);
295 * produce(pmem->written - CACHELINE_SIZE);
296 *
297 * This results in producer offset equal to pmem->written -
298 * CACHELINE_SIZE and consumer offset equal to pmem->written.
299 */
300
301 auto acq = w.acquire_cachelines(pmem->written);
302 assert(acq == 0);
303 w.produce_cachelines();
304
305 /* Restore consumer offset */
306 size_t offset;
307 auto len = consume_cachelines(&offset);
308 assert(len == pmem->written);
309 release_cachelines(len);
310
311 assert(offset == 0);
312 assert(len == pmem->written);
313
314 acq = w.acquire_cachelines(buf_size - pmem->written);
315 assert(acq >= 0);
316 assert(static_cast<size_t>(acq) == pmem->written);
317 w.produce_cachelines();
318
319 acq = w.acquire_cachelines(pmem->written -
320 pmem::detail::CACHELINE_SIZE);
321 assert(acq == 0);
322 (void)acq;
323 w.produce_cachelines();
324}
325
332 : data_(size, 0), written(0)
333{
334}
335
344{
345 auto addr = reinterpret_cast<uintptr_t>(&data_[0]);
346 auto aligned_addr =
347 pmem::detail::align_up(addr, pmem::detail::CACHELINE_SIZE);
348
349 auto size = data_.size() - (aligned_addr - addr);
350 auto aligned_size =
351 pmem::detail::align_down(size, pmem::detail::CACHELINE_SIZE);
352
354 reinterpret_cast<const char *>(aligned_addr), aligned_size);
355}
356
358mpsc_queue::get_id_manager()
359{
360 static pmem::detail::id_manager manager;
361 return manager;
362}
363
374{
375 return worker(this);
376}
377
397template <typename Function>
398inline bool
400{
401 if (pmemobj_tx_stage() != TX_STAGE_NONE)
403 "Function called inside a transaction scope.");
404
405 bool consumed = false;
406
407 /* Need to call try_consume twice, as some data may be at the end
408 * of buffer, and some may be at the beginning. Ringbuffer does not
409 * merge those two parts into one try_consume. If all data was
410 * consumed during first try_consume, second will do nothing. */
411 for (int i = 0; i < 2; i++) {
412 /* If there is no consume in progress, it's safe to call
413 * ringbuf_consume. */
414 if (!ring_buffer->consume_in_progress) {
415 size_t offset;
416 auto len = consume_cachelines(&offset);
417 if (!len)
418 return consumed;
419
420 consume_offset = offset;
421 consume_len = len;
422 } else {
423 assert(consume_len != 0);
424 }
425
426#if LIBPMEMOBJ_CPP_VG_HELGRIND_ENABLED
427 ANNOTATE_HAPPENS_AFTER(ring_buffer.get());
428#endif
429
430 auto data = buf + consume_offset;
431 auto begin = iterator(data, data + consume_len);
432 auto end = iterator(data + consume_len, data + consume_len);
433
435 if (begin != end) {
436 consumed = true;
437 f(batch_type(begin, end));
438 }
439
440 auto b = reinterpret_cast<first_block *>(data);
441 clear_cachelines(b, consume_len);
442
443 if (consume_offset + consume_len < buf_size)
444 pmem->written = consume_offset + consume_len;
445 else if (consume_offset + consume_len == buf_size)
446 pmem->written = 0;
447 else
448 assert(false);
449 });
450
451#if LIBPMEMOBJ_CPP_VG_HELGRIND_ENABLED
452 ANNOTATE_HAPPENS_BEFORE(ring_buffer.get());
453#endif
454
455 release_cachelines(consume_len);
456
457 assert(!ring_buffer->consume_in_progress);
458
459 /* XXX: it would be better to call f once - hide
460 * wraparound behind iterators */
461 /* XXX: add param to ringbuf_consume and do not
462 * call store_explicit in consume */
463 }
464
465 return consumed;
466}
467
468inline mpsc_queue::worker::worker(mpsc_queue *q)
469{
470 queue = q;
471 auto &manager = queue->get_id_manager();
472
473#if LIBPMEMOBJ_CPP_VG_DRD_ENABLED
474 ANNOTATE_BENIGN_RACE_SIZED(
475 &manager, sizeof(std::mutex),
476 "https://bugs.kde.org/show_bug.cgi?id=416286");
477#endif
478
479 id = manager.get();
480
481 assert(id < q->ring_buffer->nworkers);
482
483 w = ringbuf_register(queue->ring_buffer.get(), id);
484}
485
486inline mpsc_queue::worker::worker(mpsc_queue::worker &&other)
487{
488 *this = std::move(other);
489}
490
491inline mpsc_queue::worker &
492mpsc_queue::worker::operator=(worker &&other)
493{
494 if (this != &other) {
495 queue = other.queue;
496 w = other.w;
497 id = other.id;
498
499 other.queue = nullptr;
500 other.w = nullptr;
501 }
502 return *this;
503}
504
505inline mpsc_queue::worker::~worker()
506{
507 if (w) {
508 ringbuf_unregister(queue->ring_buffer.get(), w);
509 auto &manager = queue->get_id_manager();
510 manager.release(id);
511 }
512}
513
526template <typename Function>
527bool
529 Function &&on_produce)
530{
531 auto req_size =
532 pmem::detail::align_up(data.size() + sizeof(first_block::size),
533 pmem::detail::CACHELINE_SIZE);
534 auto offset = acquire_cachelines(req_size);
535
536#if LIBPMEMOBJ_CPP_VG_HELGRIND_ENABLED
537 ANNOTATE_HAPPENS_AFTER(queue->ring_buffer.get());
538#endif
539
540 if (offset == -1)
541 return false;
542
543 store_to_log(data, queue->buf + offset);
544
545#if LIBPMEMOBJ_CPP_VG_HELGRIND_ENABLED
546 ANNOTATE_HAPPENS_BEFORE(queue->ring_buffer.get());
547#endif
548
549 on_produce(pmem::obj::string_view(
550 queue->buf + offset + sizeof(first_block::size), data.size()));
551
552 produce_cachelines();
553
554 return true;
555}
556
557inline void
558mpsc_queue::worker::store_to_log(pmem::obj::string_view data, char *log_data)
559{
560 assert(reinterpret_cast<uintptr_t>(log_data) %
561 pmem::detail::CACHELINE_SIZE ==
562 0);
563
564/* Invariant: producer can only produce data to cachelines which have
565 * first 8 bytes zeroed.
566 */
567#ifndef NDEBUG
568 auto b = reinterpret_cast<first_block *>(log_data);
569 auto s = pmem::detail::align_up(data.size() + sizeof(first_block::size),
570 pmem::detail::CACHELINE_SIZE);
571 auto e = b + s / pmem::detail::CACHELINE_SIZE;
572 while (b < e) {
573 assert(b->size == 0);
574 b++;
575 }
576#endif
577
578 assert(reinterpret_cast<first_block *>(log_data)->size == 0);
579
580 first_block fblock;
581 fblock.size = data.size() | size_t(first_block::DIRTY_FLAG);
582
583 /*
584 * First step is to copy up to 56B of data and store
585 * data.size() with DIRTY flag set. After that, we store
586 * rest of the data in two steps:
587 * 1. Remainder of the data is aligned down to
588 * cacheline and copied.
589 * Now, we are left with between 0 to 63 bytes. If
590 * nonzero:
591 * 2. Create a stack allocated cacheline-sized
592 * buffer, fill in the remainder of the data, and
593 * copy the entire cacheline. After all data is
594 * stored, we clear the dirty flag from size.
595 *
596 * This is done so that we avoid a cache-miss on
597 * misaligned writes.
598 */
599
600 size_t ncopy = (std::min)(data.size(), size_t(first_block::CAPACITY));
601 std::copy_n(data.data(), ncopy, fblock.data);
602
603 pmemobj_memcpy(queue->pop.handle(), log_data,
604 reinterpret_cast<char *>(&fblock),
605 pmem::detail::CACHELINE_SIZE, PMEMOBJ_F_MEM_NONTEMPORAL);
606
607 size_t remaining_size = ncopy > data.size() ? 0 : data.size() - ncopy;
608
609 const char *srcof = data.data() + ncopy;
610 size_t rcopy = pmem::detail::align_down(remaining_size,
611 pmem::detail::CACHELINE_SIZE);
612 size_t lcopy = remaining_size - rcopy;
613
614 char last_cacheline[pmem::detail::CACHELINE_SIZE];
615 if (lcopy != 0)
616 std::copy_n(srcof + rcopy, lcopy, last_cacheline);
617
618 if (rcopy != 0) {
619 char *dest = log_data + pmem::detail::CACHELINE_SIZE;
620
621 pmemobj_memcpy(queue->pop.handle(), dest, srcof, rcopy,
622 PMEMOBJ_F_MEM_NODRAIN |
623 PMEMOBJ_F_MEM_NONTEMPORAL);
624 }
625
626 if (lcopy != 0) {
627 void *dest = log_data + pmem::detail::CACHELINE_SIZE + rcopy;
628
629 pmemobj_memcpy(queue->pop.handle(), dest, last_cacheline,
630 pmem::detail::CACHELINE_SIZE,
631 PMEMOBJ_F_MEM_NODRAIN |
632 PMEMOBJ_F_MEM_NONTEMPORAL);
633 }
634
635 pmemobj_drain(queue->pop.handle());
636
637 fblock.size &= (~size_t(first_block::DIRTY_FLAG));
638
639 pmemobj_memcpy(queue->pop.handle(), log_data,
640 reinterpret_cast<char *>(&fblock),
641 pmem::detail::CACHELINE_SIZE, PMEMOBJ_F_MEM_NONTEMPORAL);
642}
643
644inline mpsc_queue::batch_type::batch_type(iterator begin_, iterator end_)
645 : begin_(begin_), end_(end_)
646{
647}
648
655inline mpsc_queue::iterator
657{
658 return begin_;
659}
660
667inline mpsc_queue::iterator
669{
670 return end_;
671}
672
673mpsc_queue::iterator::iterator(char *data, char *end) : data(data), end(end)
674{
675 auto b = reinterpret_cast<first_block *>(data);
676 auto next = seek_next(b);
677 assert(next >= b);
678 this->data = reinterpret_cast<char *>(next);
679}
680
681void
682mpsc_queue::clear_cachelines(first_block *block, size_t size)
683{
684 assert(size % pmem::detail::CACHELINE_SIZE == 0);
685 assert(pmemobj_tx_stage() == TX_STAGE_WORK);
686
687 auto end = block +
688 static_cast<ptrdiff_t>(size / pmem::detail::CACHELINE_SIZE);
689
690 while (block < end) {
691 /* data in block might be uninitialized. */
692 detail::conditional_add_to_tx(&block->size, 1,
693 POBJ_XADD_ASSUME_INITIALIZED);
694 block->size = 0;
695 block++;
696 }
697
698 assert(end <= reinterpret_cast<first_block *>(buf + buf_size));
699}
700
701mpsc_queue::iterator &
703{
704 auto block = reinterpret_cast<first_block *>(data);
705 assert(block->size != 0);
706
707 auto element_size =
708 pmem::detail::align_up(block->size + sizeof(block->size),
709 pmem::detail::CACHELINE_SIZE);
710
711 block += element_size / pmem::detail::CACHELINE_SIZE;
712
713 auto next = seek_next(block);
714 assert(next >= block);
715 block = next;
716
717 data = reinterpret_cast<char *>(block);
718
719 return *this;
720}
721
722bool
723mpsc_queue::iterator::operator==(const mpsc_queue::iterator &rhs) const
724{
725 return data == rhs.data;
726}
727
728bool
729mpsc_queue::iterator::operator!=(const mpsc_queue::iterator &rhs) const
730{
731 return data != rhs.data;
732}
733
734pmem::obj::string_view mpsc_queue::iterator::operator*() const
735{
736 auto b = reinterpret_cast<first_block *>(data);
737 return pmem::obj::string_view(b->data, b->size);
738}
739
740mpsc_queue::first_block *
741mpsc_queue::iterator::seek_next(mpsc_queue::first_block *b)
742{
743 auto e = reinterpret_cast<first_block *>(end);
744
745 /* Advance to first, unconsumed element. Each cacheline can be in one of
746 * 3 states:
747 * 1. First 8 bytes (size) are equal to 0 - there is no data in this
748 * cacheline.
749 * 2. First 8 bytes (size) are non-zero and have dirty flag set - next
750 * size bytes are junk.
751 * 3. First 8 bytes (size) are non-zero and have dirty flag unset - next
752 * size bytes are ready to be consumed (they represent consistent data).
753 */
754 while (b < e) {
755 if (b->size == 0) {
756 b++;
757 } else if (b->size & size_t(first_block::DIRTY_FLAG)) {
758 auto size =
759 b->size & (~size_t(first_block::DIRTY_FLAG));
760 auto aligned_size = pmem::detail::align_up(
761 size + sizeof(b->size),
762 pmem::detail::CACHELINE_SIZE);
763
764 b += aligned_size / pmem::detail::CACHELINE_SIZE;
765 } else {
766 break;
767 }
768 }
769
770 assert(b <= e);
771
772 return b;
773}
774
775} /* namespace experimental */
776} /* namespace obj */
777} /* namespace pmem */
778
779#endif /* LIBPMEMOBJ_MPSC_QUEUE_HPP */
Our partial std::string_view implementation.
Definition: string_view.hpp:46
constexpr size_type size() const noexcept
Returns count of characters stored in this pmem::obj::string_view data.
Definition: string_view.hpp:334
constexpr const CharT * data() const noexcept
Returns pointer to data stored in this pmem::obj::string_view.
Definition: string_view.hpp:296
Type representing the range of the mpsc_queue elements.
Definition: mpsc_queue.hpp:118
iterator begin() const
Returns an iterator to the beginning of the accessed range of the mpsc_queue.
Definition: mpsc_queue.hpp:656
iterator end() const
Returns an iterator to the end of the accessed range of the mpsc_queue.
Definition: mpsc_queue.hpp:668
Type representing persistent data, which may be managed by mpsc_queue.
Definition: mpsc_queue.hpp:180
pmem_log_type(size_t size)
Constructs pmem_log_type object.
Definition: mpsc_queue.hpp:331
pmem::obj::string_view data()
Returns pmem::obj::string_view which allows to read-only access to the underlying buffer.
Definition: mpsc_queue.hpp:343
mpsc_queue producer worker class.
Definition: mpsc_queue.hpp:141
bool try_produce(pmem::obj::string_view data, Function &&on_produce=[](pmem::obj::string_view target) {})
Copies data from pmem::obj::string_view into the mpsc_queue.
Definition: mpsc_queue.hpp:528
Persistent memory aware implementation of multi producer single consumer queue.
Definition: mpsc_queue.hpp:50
bool try_consume_batch(Function &&f)
Evaluates callback function f() for the data, which is ready to be consumed.
Definition: mpsc_queue.hpp:399
mpsc_queue(pmem_log_type &pmem, size_t max_workers=1)
mpsc_queue constructor.
Definition: mpsc_queue.hpp:201
worker register_worker()
Registers the producer worker.
Definition: mpsc_queue.hpp:373
static void run(obj::pool_base &pool, std::function< void()> tx, Locks &... locks)
Execute a closure-like transaction and lock locks.
Definition: transaction.hpp:823
The non-template pool base class.
Definition: pool.hpp:50
Custom transaction error class.
Definition: pexceptions.hpp:176
Commonly used functionality.
A persistent version of thread-local storage.
Persistent_ptr transactional allocation functions for objects.
bool operator==(self_relative_ptr< T > const &lhs, self_relative_ptr< Y > const &rhs) noexcept
Equality operator.
Definition: self_relative_ptr.hpp:424
bool operator!=(self_relative_ptr< T > const &lhs, self_relative_ptr< Y > const &rhs) noexcept
Inequality operator.
Definition: self_relative_ptr.hpp:435
pmem::obj::array< T, N >::iterator end(pmem::obj::array< T, N > &a)
Non-member end.
Definition: array.hpp:849
bool operator!=(const allocator< T, P, Tr > &lhs, const OtherAllocator &rhs)
Determines if memory from another allocator can be deallocated from this one.
Definition: allocator.hpp:536
p< T > & operator++(p< T > &pp)
Prefix increment operator overload.
Definition: pext.hpp:48
pool_base pool_by_vptr(const T *that)
Retrieve pool handle for the given pointer.
Definition: utils.hpp:32
bool operator==(standard_alloc_policy< T > const &, standard_alloc_policy< T2 > const &)
Determines if memory from another allocator can be deallocated from this one.
Definition: allocator.hpp:420
pmem::obj::array< T, N >::iterator begin(pmem::obj::array< T, N > &a)
Non-member begin.
Definition: array.hpp:829
Persistent memory namespace.
Definition: allocation_flag.hpp:15
Persistent smart pointer.
Our partial std::string_view implementation.
This structure is used for assigning unique thread ids so that those ids will be reused in case of th...
Definition: enumerable_thread_specific.hpp:35
C++ pmemobj transactions.