PMDK C++ bindings 1.13.0
This is the C++ bindings documentation for PMDK's libpmemobj.
concurrent_hash_map.hpp
Go to the documentation of this file.
1// SPDX-License-Identifier: BSD-3-Clause
2/* Copyright 2019-2021, Intel Corporation */
3
10#ifndef PMEMOBJ_CONCURRENT_HASH_MAP_HPP
11#define PMEMOBJ_CONCURRENT_HASH_MAP_HPP
12
15#include <libpmemobj++/detail/pair.hpp>
17
21#include <libpmemobj++/p.hpp>
24
25#include <libpmemobj++/detail/persistent_pool_ptr.hpp>
27
29
30#include <atomic>
31#include <cassert>
32#include <functional>
33#include <initializer_list>
34#include <iterator> // for std::distance
35#include <memory>
36#include <mutex>
37#include <thread>
38#include <type_traits>
39#include <utility>
40#include <vector>
41
42namespace std
43{
47template <typename T>
48struct hash<pmem::obj::p<T>> {
49 size_t
50 operator()(const pmem::obj::p<T> &x) const
51 {
52 return hash<T>()(x.get_ro());
53 }
54};
55} /* namespace std */
56
57namespace pmem
58{
59namespace obj
60{
61
62namespace concurrent_hash_map_internal
63{
64template <typename SharedMutexT>
65class shared_mutex_scoped_lock {
66 using rw_mutex_type = SharedMutexT;
67
68public:
69 shared_mutex_scoped_lock(const shared_mutex_scoped_lock &) = delete;
70 shared_mutex_scoped_lock &
71 operator=(const shared_mutex_scoped_lock &) = delete;
72
74 shared_mutex_scoped_lock() : mutex(nullptr), is_writer(false)
75 {
76 }
77
79 shared_mutex_scoped_lock(rw_mutex_type &m, bool write = true)
80 : mutex(nullptr)
81 {
82 acquire(m, write);
83 }
84
86 ~shared_mutex_scoped_lock()
87 {
88 if (mutex)
89 release();
90 }
91
93 void
94 acquire(rw_mutex_type &m, bool write = true)
95 {
96 is_writer = write;
97 mutex = &m;
98 if (write)
99 mutex->lock();
100 else
101 mutex->lock_shared();
102 }
103
107 void
108 release()
109 {
110 assert(mutex);
111 rw_mutex_type *m = mutex;
112 mutex = nullptr;
113 if (is_writer) {
114 m->unlock();
115 } else {
116 m->unlock_shared();
117 }
118 }
119
124 bool
125 try_acquire(rw_mutex_type &m, bool write = true)
126 {
127 assert(!mutex);
128 bool result;
129 is_writer = write;
130 result = write ? m.try_lock() : m.try_lock_shared();
131 if (result)
132 mutex = &m;
133 return result;
134 }
135
136protected:
141 rw_mutex_type *mutex;
146 bool is_writer;
147}; /* class shared_mutex_scoped_lock */
148
149template <typename ScopedLockType>
150using scoped_lock_upgrade_to_writer =
151 decltype(std::declval<ScopedLockType>().upgrade_to_writer());
152
153template <typename ScopedLockType>
154using scoped_lock_has_upgrade_to_writer =
155 detail::supports<ScopedLockType, scoped_lock_upgrade_to_writer>;
156
157template <typename ScopedLockType>
158using scoped_lock_downgrade_to_reader =
159 decltype(std::declval<ScopedLockType>().downgrade_to_reader());
160
161template <typename ScopedLockType>
162using scoped_lock_has_downgrade_to_reader =
163 detail::supports<ScopedLockType, scoped_lock_downgrade_to_reader>;
164
165template <typename ScopedLockType,
166 bool = scoped_lock_has_upgrade_to_writer<ScopedLockType>::value
167 &&scoped_lock_has_downgrade_to_reader<ScopedLockType>::value>
168class scoped_lock_traits {
169public:
170 using scope_lock_type = ScopedLockType;
171
172 static bool
173 initial_rw_state(bool write)
174 {
175 /* For upgradeable locks, initial state is always read */
176 return false;
177 }
178
179 static bool
180 upgrade_to_writer(scope_lock_type &lock)
181 {
182 return lock.upgrade_to_writer();
183 }
184
185 static bool
186 downgrade_to_reader(scope_lock_type &lock)
187 {
188 return lock.downgrade_to_reader();
189 }
190};
191
192template <typename ScopedLockType>
193class scoped_lock_traits<ScopedLockType, false> {
194public:
195 using scope_lock_type = ScopedLockType;
196
197 static bool
198 initial_rw_state(bool write)
199 {
200 /* For non-upgradeable locks, we take lock in required mode
201 * immediately */
202 return write;
203 }
204
205 static bool
206 upgrade_to_writer(scope_lock_type &lock)
207 {
208 /* This overload is for locks which do not support upgrade
209 * operation. For those locks, upgrade_to_writer should not be
210 * called when holding a read lock */
211 return true;
212 }
213
214 static bool
215 downgrade_to_reader(scope_lock_type &lock)
216 {
217 /* This overload is for locks which do not support downgrade
218 * operation. For those locks, downgrade_to_reader should never
219 * be called */
220 assert(false);
221
222 return false;
223 }
224};
225}
226
227template <typename Key, typename T, typename Hash = std::hash<Key>,
228 typename KeyEqual = std::equal_to<Key>,
229 typename MutexType = pmem::obj::shared_mutex,
230 typename ScopedLockType = concurrent_hash_map_internal::
231 shared_mutex_scoped_lock<MutexType>>
232class concurrent_hash_map;
233
235namespace concurrent_hash_map_internal
236{
237/* Helper method which throws an exception when called in a tx */
238static inline void
239check_outside_tx()
240{
241 if (pmemobj_tx_stage() != TX_STAGE_NONE)
243 "Function called inside transaction scope.");
244}
245
246template <typename Hash>
247using transparent_key_equal = typename Hash::transparent_key_equal;
248
249template <typename Hash>
250using has_transparent_key_equal = detail::supports<Hash, transparent_key_equal>;
251
252template <typename Hash, typename Pred,
253 bool = has_transparent_key_equal<Hash>::value>
254struct key_equal_type {
255 using type = typename Hash::transparent_key_equal;
256};
257
258template <typename Hash, typename Pred>
259struct key_equal_type<Hash, Pred, false> {
260 using type = Pred;
261};
262
263template <typename Mutex, typename ScopedLockType>
264void
265assert_not_locked(Mutex &mtx)
266{
267#ifndef NDEBUG
268 ScopedLockType scoped_lock;
269 assert(scoped_lock.try_acquire(mtx));
270 scoped_lock.release();
271#else
272 (void)mtx;
273#endif
274}
275
276template <typename Key, typename T, typename MutexType, typename ScopedLockType>
277struct hash_map_node {
279 using mutex_t = MutexType;
280
282 using scoped_t = ScopedLockType;
283
284 using value_type = detail::pair<const Key, T>;
285
287 using node_ptr_t = detail::persistent_pool_ptr<
288 hash_map_node<Key, T, mutex_t, scoped_t>>;
289
291 node_ptr_t next;
292
294 mutex_t mutex;
295
297 value_type item;
298
299 hash_map_node(const node_ptr_t &_next, const Key &key)
300 : next(_next),
301 item(std::piecewise_construct, std::forward_as_tuple(key),
302 std::forward_as_tuple())
303 {
304 }
305
306 hash_map_node(const node_ptr_t &_next, const Key &key, const T &t)
307 : next(_next), item(key, t)
308 {
309 }
310
311 hash_map_node(const node_ptr_t &_next, value_type &&i)
312 : next(_next), item(std::move(i))
313 {
314 }
315
316 template <typename... Args>
317 hash_map_node(const node_ptr_t &_next, Args &&... args)
318 : next(_next), item(std::forward<Args>(args)...)
319 {
320 }
321
322 hash_map_node(const node_ptr_t &_next, const value_type &i)
323 : next(_next), item(i)
324 {
325 }
326
328 hash_map_node(const hash_map_node &) = delete;
329
331 hash_map_node &operator=(const hash_map_node &) = delete;
332}; /* struct node */
333
338template <typename Bucket>
339class segment_traits {
340public:
342 using segment_index_t = size_t;
343 using size_type = size_t;
344 using bucket_type = Bucket;
345
346protected:
348 constexpr static size_type max_allocation_size = PMEMOBJ_MAX_ALLOC_SIZE;
349
351 constexpr static segment_index_t first_big_block = 27;
352 /* TODO: avoid hardcoded value; need constexpr similar to:
353 * Log2(max_allocation_size / sizeof(bucket_type)) */
354
356 constexpr static size_type big_block_size = size_type(1)
357 << first_big_block;
358
359 /* Block size in bytes cannot exceed max_allocation_size */
360 static_assert((big_block_size * sizeof(bucket_type)) <
361 max_allocation_size,
362 "Block size exceeds max_allocation_size");
363
365 constexpr static segment_index_t
366 first_block_in_segment(segment_index_t seg)
367 {
368 return seg < first_big_block
369 ? seg
370 : (first_big_block +
371 (segment_index_t(1) << (seg - first_big_block)) - 1);
372 }
373
375 constexpr static size_type
376 blocks_in_segment(segment_index_t seg)
377 {
378 return seg < first_big_block
379 ? segment_index_t(1)
380 : segment_index_t(1) << (seg - first_big_block);
381 }
382
384 constexpr static size_type
385 block_size(segment_index_t b)
386 {
387 return b < first_big_block ? segment_size(b ? b : 1)
388 : big_block_size;
389 }
390
391public:
393 constexpr static segment_index_t embedded_segments = 1;
394
396 constexpr static size_type embedded_buckets = 1 << embedded_segments;
397
399 constexpr static segment_index_t number_of_segments = 32;
400
402 static const size_type first_block = 8;
403
405 constexpr static segment_index_t
406 number_of_blocks()
407 {
408 return first_block_in_segment(number_of_segments);
409 }
410
412 static segment_index_t
413 segment_index_of(size_type index)
414 {
415 return segment_index_t(detail::Log2(index | 1));
416 }
417
419 constexpr static segment_index_t
420 segment_base(segment_index_t k)
421 {
422 return (segment_index_t(1) << k) & ~segment_index_t(1);
423 }
424
426 constexpr static size_type
427 segment_size(segment_index_t k)
428 {
429 return size_type(1) << k; // fake value for k == 0
430 }
431 static_assert(
432 embedded_segments < first_big_block,
433 "Number of embedded segments cannot exceed max_allocation_size");
434}; /* End of class segment_traits */
435
452template <typename BlockTable, typename SegmentTraits, bool is_const>
453class segment_facade_impl : public SegmentTraits {
454private:
455 using traits_type = SegmentTraits;
456 using traits_type::block_size;
457 using traits_type::blocks_in_segment;
458 using traits_type::embedded_buckets;
459 using traits_type::embedded_segments;
460 using traits_type::first_block;
461 using traits_type::first_block_in_segment;
462 using traits_type::segment_base;
463 using traits_type::segment_size;
464
465public:
466 using table_reference =
467 typename std::conditional<is_const, const BlockTable &,
468 BlockTable &>::type;
469
470 using table_pointer =
471 typename std::conditional<is_const, const BlockTable *,
472 BlockTable *>::type;
473
474 using bucket_type = typename traits_type::bucket_type;
475 using segment_index_t = typename traits_type::segment_index_t;
476 using size_type = typename traits_type::size_type;
477
479 segment_facade_impl(table_reference table, segment_index_t s)
480 : my_table(&table), my_seg(s)
481 {
482 assert(my_seg < traits_type::number_of_segments);
483 }
484
486 segment_facade_impl(const segment_facade_impl &src)
487 : my_table(src.my_table), my_seg(src.my_seg)
488 {
489 }
490
491 segment_facade_impl(segment_facade_impl &&src) = default;
492
494 segment_facade_impl &
495 operator=(const segment_facade_impl &src)
496 {
497 my_table = src.my_table;
498 my_seg = src.my_seg;
499 return *this;
500 }
501
503 segment_facade_impl &
504 operator=(segment_facade_impl &&src)
505 {
506 my_table = src.my_table;
507 my_seg = src.my_seg;
508 return *this;
509 }
510
517 bucket_type &operator[](size_type i) const
518 {
519 assert(i < size());
520
521 segment_index_t table_block = first_block_in_segment(my_seg);
522 size_type b_size = block_size(table_block);
523
524 table_block += i / b_size;
525 i = i % b_size;
526
527 return (*my_table)[table_block][static_cast<std::ptrdiff_t>(i)];
528 }
529
533 segment_facade_impl &
534 operator++()
535 {
536 ++my_seg;
537 return *this;
538 }
539
543 segment_facade_impl
544 operator++(int)
545 {
546 segment_facade_impl tmp = *this;
547 ++(*this);
548 return tmp;
549 }
550
554 segment_facade_impl &
555 operator--()
556 {
557 --my_seg;
558 return *this;
559 }
560
564 segment_facade_impl
565 operator--(int)
566 {
567 segment_facade_impl tmp = *this;
568 --(*this);
569 return tmp;
570 }
571
575 segment_facade_impl &
576 operator+=(segment_index_t off)
577 {
578 my_seg += off;
579 return *this;
580 }
581
585 segment_facade_impl &
586 operator-=(segment_index_t off)
587 {
588 my_seg -= off;
589 return *this;
590 }
591
595 segment_facade_impl
596 operator+(segment_index_t off) const
597 {
598 return segment_facade_impl(*(this->my_table),
599 this->my_seg + off);
600 }
601
605 segment_facade_impl
606 operator-(segment_index_t off) const
607 {
608 return segment_facade_impl(*(this->my_table),
609 this->my_seg - off);
610 }
611
615 void
616 enable(pool_base &pop)
617 {
618 assert(my_seg >= embedded_segments);
619
620 if (my_seg < first_block) {
621 enable_first_block(pop);
622 } else {
623 enable_big_segment(pop);
624 }
625 }
626
630 void
631 disable()
632 {
633 assert(my_seg >= embedded_segments);
634
635 if (my_seg < first_block) {
636 if (my_seg == embedded_segments) {
637 size_type sz = segment_size(first_block) -
638 embedded_buckets;
639 delete_persistent<bucket_type[]>(
640 (*my_table)[my_seg], sz);
641 }
642 (*my_table)[my_seg] = nullptr;
643 } else {
644 block_range blocks = segment_blocks(my_seg);
645
646 for (segment_index_t b = blocks.first;
647 b < blocks.second; ++b) {
648 if ((*my_table)[b] != nullptr) {
649 delete_persistent<bucket_type[]>(
650 (*my_table)[b], block_size(b));
651 (*my_table)[b] = nullptr;
652 }
653 }
654 }
655 }
656
660 constexpr size_type
661 size() const
662 {
663 return segment_size(my_seg ? my_seg : 1);
664 }
665
671 bool
672 is_valid() const
673 {
674 block_range blocks = segment_blocks(my_seg);
675
676 for (segment_index_t b = blocks.first; b < blocks.second; ++b) {
677 if ((*my_table)[b] == nullptr)
678 return false;
679 }
680
681 return true;
682 }
683
684private:
685 using block_range = std::pair<segment_index_t, segment_index_t>;
686
690 static block_range
691 segment_blocks(segment_index_t seg)
692 {
693 segment_index_t begin = first_block_in_segment(seg);
694
695 return block_range(begin, begin + blocks_in_segment(seg));
696 }
697
698 void
699 enable_first_block(pool_base &pop)
700 {
701 assert(my_seg == embedded_segments);
702 {
704
705 size_type sz =
706 segment_size(first_block) - embedded_buckets;
707 (*my_table)[my_seg] =
708 make_persistent<bucket_type[]>(sz);
709
710 persistent_ptr<bucket_type> base =
711 (*my_table)[embedded_segments].raw();
712
713 for (segment_index_t s = my_seg + 1; s < first_block;
714 ++s) {
715 std::ptrdiff_t off =
716 static_cast<std::ptrdiff_t>(
717 segment_base(s) -
718 segment_base(my_seg));
719
720 (*my_table)[s] = (base + off).raw();
721 }
722
724 }
725 }
726
727 void
728 enable_big_segment(pool_base &pop)
729 {
730 block_range blocks = segment_blocks(my_seg);
731 {
733
734 for (segment_index_t b = blocks.first;
735 b < blocks.second; ++b) {
736 assert((*my_table)[b] == nullptr);
737 (*my_table)[b] = make_persistent<bucket_type[]>(
738 block_size(b));
739 }
740
742 }
743 }
744
746 table_pointer my_table;
747
749 segment_index_t my_seg;
750}; /* End of class segment_facade_impl */
751
758template <typename Key, typename T, typename MutexType, typename ScopedLockType>
759class hash_map_base {
760public:
761 using mutex_t = MutexType;
762 using scoped_t = ScopedLockType;
763
765 using size_type = size_t;
766
768 using hashcode_type = size_t;
769
771 using node = hash_map_node<Key, T, mutex_t, scoped_t>;
772
774 using node_ptr_t = detail::persistent_pool_ptr<node>;
775
777 struct bucket {
778 using mutex_t = MutexType;
779 using scoped_t = ScopedLockType;
780
782 mutex_t mutex;
783
785 p<std::atomic<uint64_t>> rehashed;
786
788 node_ptr_t node_list;
789
791 bucket() : node_list(nullptr)
792 {
793#if LIBPMEMOBJ_CPP_VG_HELGRIND_ENABLED
794 VALGRIND_HG_DISABLE_CHECKING(&rehashed,
795 sizeof(rehashed));
796#endif
797 rehashed.get_rw() = false;
798 }
799
805 bool
806 is_rehashed(std::memory_order order)
807 {
808 return rehashed.get_ro().load(order);
809 }
810
811 void
812 set_rehashed(std::memory_order order)
813 {
814 rehashed.get_rw().store(true, order);
815 }
816
818 bucket(const bucket &) = delete;
819
821 bucket &operator=(const bucket &) = delete;
822 }; /* End of struct bucket */
823
825 using segment_traits_t = segment_traits<bucket>;
826
828 using segment_index_t = typename segment_traits_t::segment_index_t;
829
831 static const size_type embedded_buckets =
832 segment_traits_t::embedded_buckets;
833
835 static const size_type first_block = segment_traits_t::first_block;
836
838 constexpr static size_type block_table_size =
839 segment_traits_t::number_of_blocks();
840
842 using segment_ptr_t = persistent_ptr<bucket[]>;
843
845 using bucket_ptr_t = persistent_ptr<bucket>;
846
848 using blocks_table_t = segment_ptr_t[block_table_size];
849
851 using segment_enable_mutex_t = pmem::obj::mutex;
852
854 struct tls_data_t {
855 p<int64_t> size_diff = 0;
856 std::aligned_storage<56, 8> padding;
857 };
858
859 using tls_t = detail::enumerable_thread_specific<tls_data_t>;
860
861 enum feature_flags : uint32_t { FEATURE_CONSISTENT_SIZE = 1 };
862
864 struct features {
865 p<uint32_t> compat;
866 p<uint32_t> incompat;
867 };
868
869 /* --------------------------------------------------------- */
870
872 p<uint64_t> my_pool_uuid;
873
876 features layout_features;
877
880 std::aligned_storage<sizeof(size_t), sizeof(size_t)>::type
881 my_mask_reserved;
882
884 /* my_mask always restored on restart. */
885 std::atomic<hashcode_type> my_mask;
886
887 /* Size of value (key and value pair) stored in a pool */
888 std::size_t value_size;
889
891 std::aligned_storage<24, 8>::type padding1;
892
897 blocks_table_t my_table;
898
899 /* It must be in separate cache line from my_mask due to performance
900 * effects */
902 std::atomic<size_type> my_size;
903
905 std::aligned_storage<24, 8>::type padding2;
906
908 persistent_ptr<tls_t> tls_ptr;
909
915 p<size_t> on_init_size;
916
918 std::aligned_storage<40, 8>::type reserved;
919
921 segment_enable_mutex_t my_segment_enable_mutex;
922
924 bucket my_embedded_segment[embedded_buckets];
925
926 /* --------------------------------------------------------- */
927
929 static constexpr features
930 header_features()
931 {
932 return {FEATURE_CONSISTENT_SIZE, 0};
933 }
934
935 const std::atomic<hashcode_type> &
936 mask() const noexcept
937 {
938 return my_mask;
939 }
940
941 std::atomic<hashcode_type> &
942 mask() noexcept
943 {
944 return my_mask;
945 }
946
947 size_t
948 size() const
949 {
950 return my_size.load(std::memory_order_relaxed);
951 }
952
953 p<int64_t> &
954 thread_size_diff()
955 {
956 assert(this->tls_ptr != nullptr);
957 return this->tls_ptr->local().size_diff;
958 }
959
961 void
962 tls_restore()
963 {
964 assert(this->tls_ptr != nullptr);
965
966 pool_base pop = pool_base{pmemobj_pool_by_ptr(this)};
967
968 int64_t last_run_size = 0;
969 for (auto &data : *tls_ptr)
970 last_run_size += data.size_diff;
971
972 /* Make sure that on_init_size + last_run_size >= 0 */
973 assert(last_run_size >= 0 ||
974 static_cast<int64_t>(static_cast<size_t>(last_run_size) +
975 on_init_size) >= 0);
976
977 flat_transaction::run(pop, [&] {
978 on_init_size += static_cast<size_t>(last_run_size);
979 tls_ptr->clear();
980 });
981
982 this->my_size = on_init_size;
983 }
984
986 using const_segment_facade_t =
987 segment_facade_impl<blocks_table_t, segment_traits_t, true>;
988
990 using segment_facade_t =
991 segment_facade_impl<blocks_table_t, segment_traits_t, false>;
992
994 hash_map_base()
995 {
996 static_assert(
997 sizeof(size_type) == sizeof(std::atomic<size_type>),
998 "std::atomic should have the same layout as underlying integral type");
999
1000#if LIBPMEMOBJ_CPP_VG_HELGRIND_ENABLED
1001 VALGRIND_HG_DISABLE_CHECKING(&my_mask, sizeof(my_mask));
1002#endif
1003 layout_features = {0, 0};
1004
1005 PMEMoid oid = pmemobj_oid(this);
1006
1007 assert(!OID_IS_NULL(oid));
1008
1009 my_pool_uuid = oid.pool_uuid_lo;
1010
1011 pool_base pop = get_pool_base();
1012 /* enable embedded segments */
1013 for (size_type i = 0; i < segment_traits_t::embedded_segments;
1014 ++i) {
1015 my_table[i] =
1016 pmemobj_oid(my_embedded_segment +
1017 segment_traits_t::segment_base(i));
1018 segment_facade_t seg(my_table, i);
1019 mark_rehashed<false>(pop, seg);
1020 }
1021
1022 on_init_size = 0;
1023
1024 value_size = 0;
1025
1026 this->tls_ptr = nullptr;
1027 }
1028
1029 /*
1030 * Should be called before concurrent_hash_map destructor is called.
1031 * Otherwise, program can terminate if an exception occurs while freeing
1032 * memory inside dtor.
1033 */
1034 void
1035 free_tls()
1036 {
1037 auto pop = get_pool_base();
1038
1039 if ((layout_features.compat & FEATURE_CONSISTENT_SIZE) &&
1040 tls_ptr) {
1041 flat_transaction::run(pop, [&] {
1042 delete_persistent<tls_t>(tls_ptr);
1043 tls_ptr = nullptr;
1044 });
1045 }
1046 }
1047
1051 void
1052 calculate_mask()
1053 {
1054#if LIBPMEMOBJ_CPP_VG_HELGRIND_ENABLED
1055 VALGRIND_HG_DISABLE_CHECKING(&my_size, sizeof(my_size));
1056 VALGRIND_HG_DISABLE_CHECKING(&my_mask, sizeof(my_mask));
1057#endif
1058#if LIBPMEMOBJ_CPP_VG_PMEMCHECK_ENABLED
1059 VALGRIND_PMC_REMOVE_PMEM_MAPPING(&my_size, sizeof(my_size));
1060 VALGRIND_PMC_REMOVE_PMEM_MAPPING(&my_mask, sizeof(my_mask));
1061#endif
1062
1063 hashcode_type m = embedded_buckets - 1;
1064
1065 const_segment_facade_t segment(
1066 my_table, segment_traits_t::embedded_segments);
1067
1068 while (segment.is_valid()) {
1069 m += segment.size();
1070 ++segment;
1071 }
1072
1073 mask().store(m, std::memory_order_relaxed);
1074 }
1075
1079 template <bool Flush = true>
1080 void
1081 mark_rehashed(pool_base &pop, segment_facade_t &segment)
1082 {
1083 for (size_type i = 0; i < segment.size(); ++i) {
1084 bucket *b = &(segment[i]);
1085
1086 assert_not_locked<mutex_t, scoped_t>(b->mutex);
1087
1088 b->set_rehashed(std::memory_order_relaxed);
1089 }
1090
1091 if (Flush) {
1092 /* Flush in separate loop to avoid read-after-flush */
1093 for (size_type i = 0; i < segment.size(); ++i) {
1094 bucket *b = &(segment[i]);
1095 pop.flush(b->rehashed);
1096 }
1097
1098 pop.drain();
1099 }
1100 }
1101
1105 void
1106 enable_segment(segment_index_t k, bool is_initial = false)
1107 {
1108 assert(k);
1109
1110 pool_base pop = get_pool_base();
1111 size_type sz;
1112
1113 if (k >= first_block) {
1114 segment_facade_t new_segment(my_table, k);
1115
1116 sz = new_segment.size();
1117 if (!new_segment.is_valid())
1118 new_segment.enable(pop);
1119
1120 if (is_initial) {
1121 mark_rehashed(pop, new_segment);
1122 }
1123
1124 /* double it to get entire capacity of the container */
1125 sz <<= 1;
1126 } else {
1127 /* the first block */
1128 assert(k == segment_traits_t::embedded_segments);
1129
1130 for (segment_index_t i = k; i < first_block; ++i) {
1131 segment_facade_t new_segment(my_table, i);
1132
1133 if (!new_segment.is_valid())
1134 new_segment.enable(pop);
1135
1136 if (is_initial) {
1137 mark_rehashed(pop, new_segment);
1138 }
1139 }
1140
1141 sz = segment_traits_t::segment_size(first_block);
1142 }
1143#if LIBPMEMOBJ_CPP_VG_HELGRIND_ENABLED
1144 ANNOTATE_HAPPENS_BEFORE(&my_mask);
1145#endif
1146 mask().store(sz - 1, std::memory_order_release);
1147 }
1148
1153 bucket *
1154 get_bucket(hashcode_type h) const
1155 {
1156 segment_index_t s = segment_traits_t::segment_index_of(h);
1157
1158 h -= segment_traits_t::segment_base(s);
1159
1160 const_segment_facade_t segment(my_table, s);
1161
1162 assert(segment.is_valid());
1163
1164 return &(segment[h]);
1165 }
1166
1170 inline bool
1171 check_mask_race(hashcode_type h, hashcode_type &m) const
1172 {
1173 hashcode_type m_now, m_old = m;
1174
1175 m_now = mask().load(std::memory_order_acquire);
1176#if LIBPMEMOBJ_CPP_VG_HELGRIND_ENABLED
1177 ANNOTATE_HAPPENS_AFTER(&(this->my_mask));
1178#endif
1179
1180 if (m_old != m_now)
1181 return check_rehashing_collision(h, m_old, m = m_now);
1182
1183 return false;
1184 }
1185
1189 bool
1190 check_rehashing_collision(hashcode_type h, hashcode_type m_old,
1191 hashcode_type m) const
1192 {
1193 assert(m_old != m);
1194
1195 if ((h & m_old) != (h & m)) {
1196 /* mask changed for this hashcode, rare event condition
1197 * above proves that 'h' has some other bits set beside
1198 * 'm_old', find next applicable mask after m_old */
1199
1200 for (++m_old; !(h & m_old); m_old <<= 1)
1201 ;
1202
1203 m_old = (m_old << 1) - 1; /* get full mask from a bit */
1204
1205 assert((m_old & (m_old + 1)) == 0 && m_old <= m);
1206
1207 /* check whether it is rehashing/ed */
1208 bucket *b = get_bucket(h & m_old);
1209 return b->is_rehashed(std::memory_order_acquire);
1210 }
1211
1212 return false;
1213 }
1214
1219 template <typename Node, typename... Args>
1220 void
1221 insert_new_node_internal(bucket *b,
1222 detail::persistent_pool_ptr<Node> &new_node,
1223 Args &&... args)
1224 {
1225 assert(pmemobj_tx_stage() == TX_STAGE_WORK);
1226
1227 new_node = pmem::obj::make_persistent<Node>(
1228 b->node_list, std::forward<Args>(args)...);
1229 b->node_list = new_node; /* bucket is locked */
1230 }
1231
1236 template <typename Node, typename... Args>
1237 size_type
1238 insert_new_node(bucket *b, detail::persistent_pool_ptr<Node> &new_node,
1239 Args &&... args)
1240 {
1241 pool_base pop = get_pool_base();
1242
1243 /*
1244 * This is only true when called from singlethreaded methods
1245 * like swap() or operator=. In that case it's safe to directly
1246 * modify on_init_size.
1247 */
1248 if (pmemobj_tx_stage() == TX_STAGE_WORK) {
1249 insert_new_node_internal(b, new_node,
1250 std::forward<Args>(args)...);
1251 this->on_init_size++;
1252 } else {
1253 auto &size_diff = thread_size_diff();
1254
1256 insert_new_node_internal(
1257 b, new_node,
1258 std::forward<Args>(args)...);
1259 ++size_diff;
1260 });
1261 }
1262
1263 /* Increment volatile size */
1264 return ++(this->my_size);
1265 }
1266
1271 bool
1272 check_growth(hashcode_type m, size_type sz)
1273 {
1274 if (sz >= m) {
1275 segment_index_t new_seg =
1276 static_cast<segment_index_t>(detail::Log2(
1277 m +
1278 1)); /* optimized segment_index_of */
1279
1280 assert(segment_facade_t(my_table, new_seg - 1)
1281 .is_valid());
1282
1283 std::unique_lock<segment_enable_mutex_t> lock(
1284 my_segment_enable_mutex, std::try_to_lock);
1285
1286 if (lock) {
1287 if (mask().load(std::memory_order_relaxed) ==
1288 m) {
1289 /* Otherwise, other thread enable this
1290 * segment */
1291 enable_segment(new_seg);
1292
1293 return true;
1294 }
1295 }
1296 }
1297
1298 return false;
1299 }
1300
1304 void
1305 reserve(size_type buckets)
1306 {
1307 if (buckets == 0)
1308 return;
1309
1310 --buckets;
1311
1312 bool is_initial = this->size() == 0;
1313
1314 for (size_type m = mask(); buckets > m; m = mask())
1315 enable_segment(
1316 segment_traits_t::segment_index_of(m + 1),
1317 is_initial);
1318 }
1319
1324 void
1325 internal_swap(hash_map_base<Key, T, mutex_t, scoped_t> &table)
1326 {
1327 pool_base p = get_pool_base();
1328 {
1330
1331 this->my_pool_uuid.swap(table.my_pool_uuid);
1332
1333 /*
1334 * As internal_swap can only be called
1335 * from one thread, and there can be an outer
1336 * transaction we must make sure that mask and size
1337 * changes are transactional
1338 */
1339 flat_transaction::snapshot((size_t *)&this->my_mask);
1340 flat_transaction::snapshot((size_t *)&this->my_size);
1341
1342 this->mask() = table.mask().exchange(
1343 this->mask(), std::memory_order_relaxed);
1344
1345 this->my_size = table.my_size.exchange(
1346 this->my_size, std::memory_order_relaxed);
1347
1348 /* Swap consistent size */
1349 std::swap(this->tls_ptr, table.tls_ptr);
1350
1351 for (size_type i = 0; i < embedded_buckets; ++i)
1352 this->my_embedded_segment[i].node_list.swap(
1353 table.my_embedded_segment[i].node_list);
1354
1355 for (size_type i = segment_traits_t::embedded_segments;
1356 i < block_table_size; ++i)
1357 this->my_table[i].swap(table.my_table[i]);
1358
1360 }
1361 }
1362
1367 pool_base
1368 get_pool_base()
1369 {
1370 PMEMobjpool *pop =
1371 pmemobj_pool_by_oid(PMEMoid{my_pool_uuid, 0});
1372
1373 return pool_base(pop);
1374 }
1375}; /* End of class hash_map_base */
1376
1382template <typename Container, bool is_const>
1383class hash_map_iterator {
1384public:
1385 using iterator_category = std::forward_iterator_tag;
1386 using difference_type = ptrdiff_t;
1387 using map_type = Container;
1388 using value_type = typename map_type::value_type;
1389 using node = typename map_type::node;
1390 using bucket = typename map_type::bucket;
1391 using map_ptr = typename std::conditional<is_const, const map_type *,
1392 map_type *>::type;
1393 using reference =
1394 typename std::conditional<is_const,
1395 typename map_type::const_reference,
1396 typename map_type::reference>::type;
1397 using pointer =
1398 typename std::conditional<is_const,
1399 typename map_type::const_pointer,
1400 typename map_type::pointer>::type;
1401
1402 template <typename C, bool M, bool U>
1403 friend bool operator==(const hash_map_iterator<C, M> &i,
1404 const hash_map_iterator<C, U> &j);
1405
1406 template <typename C, bool M, bool U>
1407 friend bool operator!=(const hash_map_iterator<C, M> &i,
1408 const hash_map_iterator<C, U> &j);
1409
1410 friend class hash_map_iterator<map_type, true>;
1411
1412#if !defined(_MSC_VER) || defined(__INTEL_COMPILER)
1413private:
1414 template <typename Key, typename T, typename Hash, typename KeyEqual,
1415 typename MutexType, typename ScopedLockType>
1416 friend class ::pmem::obj::concurrent_hash_map;
1417#else
1418public: /* workaround */
1419#endif
1420 hash_map_iterator(map_ptr map, size_t index)
1421 : my_map(map), my_index(index), my_bucket(nullptr), my_node(nullptr)
1422 {
1423 if (my_index <= my_map->mask()) {
1424 bucket_accessor acc(my_map, my_index);
1425 my_bucket = acc.get();
1426 my_node = static_cast<node *>(
1427 my_bucket->node_list.get(my_map->my_pool_uuid));
1428
1429 if (!my_node) {
1430 advance_to_next_bucket();
1431 }
1432 }
1433 }
1434
1435public:
1437 hash_map_iterator() = default;
1438
1440 hash_map_iterator(const hash_map_iterator &other)
1441 : my_map(other.my_map),
1442 my_index(other.my_index),
1443 my_bucket(other.my_bucket),
1444 my_node(other.my_node)
1445 {
1446 }
1447
1449 template <typename U = void,
1450 typename = typename std::enable_if<is_const, U>::type>
1451 hash_map_iterator(const hash_map_iterator<map_type, false> &other)
1452 : my_map(other.my_map),
1453 my_index(other.my_index),
1454 my_bucket(other.my_bucket),
1455 my_node(other.my_node)
1456 {
1457 }
1458
1459 hash_map_iterator &operator=(const hash_map_iterator &it) = default;
1460
1462 reference operator*() const
1463 {
1464 assert(my_node);
1465 return my_node->item;
1466 }
1467
1469 pointer operator->() const
1470 {
1471 return &operator*();
1472 }
1473
1475 hash_map_iterator &
1476 operator++()
1477 {
1478 my_node = static_cast<node *>(
1479 my_node->next.get((my_map->my_pool_uuid)));
1480
1481 if (!my_node)
1482 advance_to_next_bucket();
1483
1484 return *this;
1485 }
1486
1488 hash_map_iterator
1489 operator++(int)
1490 {
1491 hash_map_iterator old(*this);
1492 operator++();
1493 return old;
1494 }
1495
1496private:
1498 map_ptr my_map = nullptr;
1499
1501 size_t my_index = 0;
1502
1504 bucket *my_bucket = nullptr;
1505
1507 node *my_node = nullptr;
1508
1509 class bucket_accessor {
1510 public:
1511 bucket_accessor(map_ptr m, size_t index)
1512 {
1513 my_bucket = m->get_bucket(index);
1514 }
1515
1516 bucket *
1517 get() const
1518 {
1519 return my_bucket;
1520 }
1521
1522 private:
1523 bucket *my_bucket;
1524 };
1525
1526 void
1527 advance_to_next_bucket()
1528 {
1529 size_t k = my_index + 1;
1530
1531 assert(my_bucket);
1532
1533 while (k <= my_map->mask()) {
1534 bucket_accessor acc(my_map, k);
1535 my_bucket = acc.get();
1536
1537 if (my_bucket->node_list) {
1538 my_node = static_cast<node *>(
1539 my_bucket->node_list.get(
1540 my_map->my_pool_uuid));
1541
1542 my_index = k;
1543
1544 return;
1545 }
1546
1547 ++k;
1548 }
1549
1550 my_bucket = 0;
1551 my_node = 0;
1552 my_index = k;
1553 }
1554};
1555
1556template <typename Container, bool M, bool U>
1557bool
1558operator==(const hash_map_iterator<Container, M> &i,
1559 const hash_map_iterator<Container, U> &j)
1560{
1561 return i.my_node == j.my_node && i.my_map == j.my_map;
1562}
1563
1564template <typename Container, bool M, bool U>
1565bool
1566operator!=(const hash_map_iterator<Container, M> &i,
1567 const hash_map_iterator<Container, U> &j)
1568{
1569 return i.my_node != j.my_node || i.my_map != j.my_map;
1570}
1571} /* namespace concurrent_hash_map_internal */
1624template <typename Key, typename T, typename Hash, typename KeyEqual,
1625 typename MutexType, typename ScopedLockType>
1627 : protected concurrent_hash_map_internal::hash_map_base<Key, T, MutexType,
1628 ScopedLockType> {
1629 template <typename Container, bool is_const>
1630 friend class concurrent_hash_map_internal::hash_map_iterator;
1631
1632public:
1633 using size_type = typename concurrent_hash_map_internal::hash_map_base<
1634 Key, T, MutexType, ScopedLockType>::size_type;
1635 using hashcode_type =
1636 typename concurrent_hash_map_internal::hash_map_base<
1637 Key, T, MutexType, ScopedLockType>::hashcode_type;
1638 using key_type = Key;
1639 using mapped_type = T;
1640 using value_type = typename concurrent_hash_map_internal::hash_map_base<
1641 Key, T, MutexType, ScopedLockType>::node::value_type;
1642 using difference_type = ptrdiff_t;
1643 using pointer = value_type *;
1644 using const_pointer = const value_type *;
1645 using reference = value_type &;
1646 using const_reference = const value_type &;
1647 using iterator = concurrent_hash_map_internal::hash_map_iterator<
1648 concurrent_hash_map, false>;
1649 using const_iterator = concurrent_hash_map_internal::hash_map_iterator<
1650 concurrent_hash_map, true>;
1651 using hasher = Hash;
1652 using key_equal = typename concurrent_hash_map_internal::key_equal_type<
1653 Hash, KeyEqual>::type;
1654
1655protected:
1656 using mutex_t = MutexType;
1657 using scoped_t = ScopedLockType;
1658 /*
1659 * Explicitly use methods and types from template base class
1660 */
1661 using hash_map_base =
1662 concurrent_hash_map_internal::hash_map_base<Key, T, mutex_t,
1663 scoped_t>;
1664 using hash_map_base::calculate_mask;
1665 using hash_map_base::check_growth;
1666 using hash_map_base::check_mask_race;
1667 using hash_map_base::embedded_buckets;
1668 using hash_map_base::FEATURE_CONSISTENT_SIZE;
1669 using hash_map_base::get_bucket;
1670 using hash_map_base::get_pool_base;
1671 using hash_map_base::header_features;
1672 using hash_map_base::insert_new_node;
1673 using hash_map_base::internal_swap;
1674 using hash_map_base::layout_features;
1675 using hash_map_base::mask;
1676 using hash_map_base::reserve;
1677 using tls_t = typename hash_map_base::tls_t;
1678 using node = typename hash_map_base::node;
1679 using node_mutex_t = typename node::mutex_t;
1680 using node_ptr_t = typename hash_map_base::node_ptr_t;
1681 using bucket = typename hash_map_base::bucket;
1682 using bucket_lock_type = typename bucket::scoped_t;
1683 using segment_index_t = typename hash_map_base::segment_index_t;
1684 using segment_traits_t = typename hash_map_base::segment_traits_t;
1685 using segment_facade_t = typename hash_map_base::segment_facade_t;
1686 using scoped_lock_traits_type =
1687 concurrent_hash_map_internal::scoped_lock_traits<scoped_t>;
1688
1689 friend class const_accessor;
1690 using persistent_node_ptr_t = detail::persistent_pool_ptr<node>;
1691
1692 void
1693 delete_node(const node_ptr_t &n)
1694 {
1695 delete_persistent<node>(
1696 detail::static_persistent_pool_pointer_cast<node>(n)
1697 .get_persistent_ptr(this->my_pool_uuid));
1698 }
1699
1700 template <typename K>
1701 persistent_node_ptr_t
1702 search_bucket(const K &key, bucket *b) const
1703 {
1704 assert(b->is_rehashed(std::memory_order_relaxed));
1705
1706 persistent_node_ptr_t n =
1707 detail::static_persistent_pool_pointer_cast<node>(
1708 b->node_list);
1709
1710 while (n &&
1711 !key_equal{}(key,
1712 n.get(this->my_pool_uuid)->item.first)) {
1713 n = detail::static_persistent_pool_pointer_cast<node>(
1714 n.get(this->my_pool_uuid)->next);
1715 }
1716
1717 return n;
1718 }
1719
1724 class bucket_accessor : public bucket_lock_type {
1725 bucket *my_b;
1726
1727 public:
1728 bucket_accessor(bucket_accessor &&b) noexcept : my_b(b.my_b)
1729 {
1730 bucket_lock_type::mutex = b.bucket_lock_type::mutex;
1731 bucket_lock_type::is_writer =
1732 b.bucket_lock_type::is_writer;
1733 b.my_b = nullptr;
1734 b.bucket_lock_type::mutex = nullptr;
1735 b.bucket_lock_type::is_writer = false;
1736 }
1737
1739 const hashcode_type h, bool writer = false)
1740 {
1741 acquire(base, h, writer);
1742 }
1743
1744 bucket_accessor(const bucket_accessor &other) = delete;
1745
1747 operator=(const bucket_accessor &other) = delete;
1748
1753 inline void
1754 acquire(concurrent_hash_map *base, const hashcode_type h,
1755 bool writer = false)
1756 {
1757 my_b = base->get_bucket(h);
1758
1759 if (my_b->is_rehashed(std::memory_order_acquire) ==
1760 false &&
1761 bucket_lock_type::try_acquire(this->my_b->mutex,
1762 /*write=*/true)) {
1763 if (my_b->is_rehashed(
1764 std::memory_order_relaxed) ==
1765 false) {
1766 /* recursive rehashing */
1767 base->rehash_bucket<false>(my_b, h);
1768 }
1769 } else {
1770 bucket_lock_type::acquire(my_b->mutex, writer);
1771 }
1772
1773 assert(my_b->is_rehashed(std::memory_order_relaxed));
1774 }
1775
1779 bool
1781 {
1782 return bucket_lock_type::is_writer;
1783 }
1784
1789 bucket *
1790 get() const
1791 {
1792 return my_b;
1793 }
1794
1799 bucket *operator->() const
1800 {
1801 return this->get();
1802 }
1803 };
1804
1809 bucket *my_b;
1810
1811 public:
1813 const hashcode_type h,
1814 bool writer = false)
1815 {
1816 acquire(base, h, writer);
1817 }
1818
1819 /*
1820 * Find a bucket by masked hashcode, optionally rehash
1821 */
1822 inline void
1823 acquire(concurrent_hash_map *base, const hashcode_type h,
1824 bool writer = false)
1825 {
1826 my_b = base->get_bucket(h);
1827
1828 if (my_b->is_rehashed(std::memory_order_relaxed) ==
1829 false) {
1830 /* recursive rehashing */
1831 base->rehash_bucket<true>(my_b, h);
1832 }
1833
1834 assert(my_b->is_rehashed(std::memory_order_relaxed));
1835 }
1836
1843 bool
1845 {
1846 return true;
1847 }
1848
1853 bucket *
1854 get() const
1855 {
1856 return my_b;
1857 }
1858
1863 bucket *operator->() const
1864 {
1865 return this->get();
1866 }
1867 };
1868
1869 hashcode_type
1870 get_hash_code(node_ptr_t &n)
1871 {
1872 return hasher{}(
1873 detail::static_persistent_pool_pointer_cast<node>(n)(
1874 this->my_pool_uuid)
1875 ->item.first);
1876 }
1877
1878 template <bool serial>
1879 void
1880 rehash_bucket(bucket *b_new, const hashcode_type h)
1881 {
1882 using accessor_type = typename std::conditional<
1883 serial, serial_bucket_accessor, bucket_accessor>::type;
1884
1885 using scoped_lock_traits_type =
1886 concurrent_hash_map_internal::scoped_lock_traits<
1887 accessor_type>;
1888
1889 /* First two bucket should be always rehashed */
1890 assert(h > 1);
1891
1892 pool_base pop = get_pool_base();
1893 node_ptr_t *p_new = &(b_new->node_list);
1894
1895 /* This condition is only true when there was a failure just
1896 * before setting rehashed flag */
1897 if (*p_new != nullptr) {
1898 assert(!b_new->is_rehashed(std::memory_order_relaxed));
1899
1900 b_new->set_rehashed(std::memory_order_relaxed);
1901 pop.persist(b_new->rehashed);
1902
1903 return;
1904 }
1905
1906 /* get parent mask from the topmost bit */
1907 hashcode_type mask = (1u << detail::Log2(h)) - 1;
1908 assert((h & mask) < h);
1909 accessor_type b_old(
1910 this, h & mask,
1911 scoped_lock_traits_type::initial_rw_state(true));
1912
1914 /* get full mask for new bucket */
1915 mask = (mask << 1) | 1;
1916 assert((mask & (mask + 1)) == 0 && (h & mask) == h);
1917
1918 restart:
1919 for (node_ptr_t *p_old = &(b_old->node_list),
1920 n = *p_old;
1921 n; n = *p_old) {
1922 hashcode_type c = get_hash_code(n);
1923#ifndef NDEBUG
1924 hashcode_type bmask = h & (mask >> 1);
1925
1926 bmask = bmask == 0
1927 ? 1 /* minimal mask of parent bucket */
1928 : (1u << (detail::Log2(bmask) + 1)) - 1;
1929
1930 assert((c & bmask) == (h & bmask));
1931#endif
1932
1933 if ((c & mask) == h) {
1934 if (!b_old.is_writer() &&
1935 !scoped_lock_traits_type::
1936 upgrade_to_writer(b_old)) {
1937 goto restart;
1938 /* node ptr can be invalid due
1939 * to concurrent erase */
1940 }
1941
1942 /* Add to new b_new */
1943 *p_new = n;
1944
1945 /* exclude from b_old */
1946 *p_old = n(this->my_pool_uuid)->next;
1947
1948 p_new = &(n(this->my_pool_uuid)->next);
1949 } else {
1950 /* iterate to next item */
1951 p_old = &(n(this->my_pool_uuid)->next);
1952 }
1953 }
1954
1955 *p_new = nullptr;
1956 });
1957
1958 /* mark rehashed */
1959 b_new->set_rehashed(std::memory_order_release);
1960 pop.persist(b_new->rehashed);
1961 }
1962
1963 void
1964 check_incompat_features()
1965 {
1966 if (layout_features.incompat != header_features().incompat)
1967 throw pmem::layout_error(
1968 "Incompat flags mismatch, for more details go to: https://pmem.io/libpmemobj-cpp\n");
1969
1970 if ((layout_features.compat & FEATURE_CONSISTENT_SIZE) &&
1971 this->value_size != sizeof(value_type))
1972 throw pmem::layout_error(
1973 "Size of value_type is different than the one stored in the pool\n");
1974 }
1975
1976public:
1977 class accessor;
1982 : protected node::scoped_t /*which derived from no_copy*/ {
1983 friend class concurrent_hash_map<Key, T, Hash, KeyEqual,
1984 mutex_t, scoped_t>;
1985 friend class accessor;
1987 using node::scoped_t::try_acquire;
1988
1989 public:
1994 const typename concurrent_hash_map::value_type;
1995
2000 bool
2001 empty() const
2002 {
2003 return !my_node;
2004 }
2005
2012 void
2014 {
2015 concurrent_hash_map_internal::check_outside_tx();
2016
2017 if (my_node) {
2018 node::scoped_t::release();
2019 my_node = 0;
2020 }
2021 }
2022
2026 const_reference operator*() const
2027 {
2028 assert(my_node);
2029
2030 return my_node->item;
2031 }
2032
2036 const_pointer operator->() const
2037 {
2038 return &operator*();
2039 }
2040
2046 const_accessor() : my_node(OID_NULL), my_hash()
2047 {
2048 concurrent_hash_map_internal::check_outside_tx();
2049 }
2050
2055 {
2056 my_node = OID_NULL; // scoped lock's release() is called
2057 // in its destructor
2058 }
2059
2060 protected:
2061 node_ptr_t my_node;
2062
2063 hashcode_type my_hash;
2064 };
2065
2070 class accessor : public const_accessor {
2071 public:
2073 using value_type = typename concurrent_hash_map::value_type;
2074
2076 reference operator*() const
2077 {
2078 assert(this->my_node);
2079
2080 return this->my_node->item;
2081 }
2082
2084 pointer operator->() const
2085 {
2086 return &operator*();
2087 }
2088 };
2089
2093 concurrent_hash_map() : hash_map_base()
2094 {
2096 }
2097
2102 concurrent_hash_map(size_type n) : hash_map_base()
2103 {
2105
2106 reserve(n);
2107 }
2108
2112 concurrent_hash_map(const concurrent_hash_map &table) : hash_map_base()
2113 {
2115
2116 reserve(table.size());
2117
2118 internal_copy(table);
2119 }
2120
2124 concurrent_hash_map(concurrent_hash_map &&table) : hash_map_base()
2125 {
2127
2128 swap(table);
2129 }
2130
2134 template <typename I>
2135 concurrent_hash_map(I first, I last)
2136 {
2138
2139 reserve(static_cast<size_type>(std::distance(first, last)));
2140
2141 internal_copy(first, last);
2142 }
2143
2147 concurrent_hash_map(std::initializer_list<value_type> il)
2148 {
2150
2151 reserve(il.size());
2152
2153 internal_copy(il.begin(), il.end());
2154 }
2155
2164 void
2166 {
2167 check_incompat_features();
2168
2169 calculate_mask();
2170
2171 /*
2172 * Handle case where hash_map was created without
2173 * FEATURE_CONSISTENT_SIZE.
2174 */
2175 if (!(layout_features.compat & FEATURE_CONSISTENT_SIZE)) {
2176 auto actual_size =
2177 std::distance(this->begin(), this->end());
2178 assert(actual_size >= 0);
2179
2180 this->my_size = static_cast<size_t>(actual_size);
2181
2182 auto pop = get_pool_base();
2183 flat_transaction::run(pop, [&] {
2184 this->tls_ptr = make_persistent<tls_t>();
2185 this->on_init_size =
2186 static_cast<size_t>(actual_size);
2187 this->value_size = sizeof(value_type);
2188
2189 layout_features.compat |=
2190 FEATURE_CONSISTENT_SIZE;
2191 });
2192 } else {
2193 assert(this->tls_ptr != nullptr);
2194 this->tls_restore();
2195 }
2196
2197 assert(this->size() ==
2198 size_type(std::distance(this->begin(), this->end())));
2199 }
2200
2201 [[deprecated(
2202 "runtime_initialize(bool) is now deprecated, use runtime_initialize(void)")]] void
2203 runtime_initialize(bool graceful_shutdown)
2204 {
2205 check_incompat_features();
2206
2207 calculate_mask();
2208
2209 if (!graceful_shutdown) {
2210 auto actual_size =
2211 std::distance(this->begin(), this->end());
2212 assert(actual_size >= 0);
2213 this->my_size = static_cast<size_type>(actual_size);
2214 } else {
2215 assert(this->size() ==
2216 size_type(std::distance(this->begin(),
2217 this->end())));
2218 }
2219 }
2220
2232 concurrent_hash_map &
2234 {
2235 if (this != &table) {
2236 clear();
2237 internal_copy(table);
2238 }
2239
2240 return *this;
2241 }
2242
2255 operator=(std::initializer_list<value_type> il)
2256 {
2257 clear();
2258
2259 reserve(il.size());
2260
2261 internal_copy(il.begin(), il.end());
2262
2263 return *this;
2264 }
2265
2274 void rehash(size_type n = 0);
2275
2282 void clear();
2283
2301 void
2303 {
2304 if (!this->tls_ptr)
2305 return;
2306
2307 auto pop = get_pool_base();
2308
2309 flat_transaction::run(pop, [&] {
2310 clear();
2311 this->free_tls();
2312 });
2313 }
2314
2324 {
2325 try {
2326 free_data();
2327 } catch (...) {
2328 std::terminate();
2329 }
2330 }
2331
2332 //------------------------------------------------------------------------
2333 // STL support - not thread-safe methods
2334 //------------------------------------------------------------------------
2335
2342 iterator
2344 {
2345 return iterator(this, 0);
2346 }
2347
2352 iterator
2354 {
2355 return iterator(this, mask() + 1);
2356 }
2357
2362 const_iterator
2363 begin() const
2364 {
2365 return const_iterator(this, 0);
2366 }
2367
2372 const_iterator
2373 end() const
2374 {
2375 return const_iterator(this, mask() + 1);
2376 }
2377
2381 size_type
2382 size() const
2383 {
2384 return hash_map_base::size();
2385 }
2386
2390 bool
2391 empty() const
2392 {
2393 return this->size() == 0;
2394 }
2395
2399 size_type
2400 max_size() const
2401 {
2402 return (~size_type(0)) / sizeof(node);
2403 }
2404
2408 size_type
2410 {
2411 return mask() + 1;
2412 }
2413
2417 void swap(concurrent_hash_map &table);
2418
2419 //------------------------------------------------------------------------
2420 // concurrent map operations
2421 //------------------------------------------------------------------------
2422
2428 size_type
2429 count(const Key &key) const
2430 {
2431 concurrent_hash_map_internal::check_outside_tx();
2432
2433 return const_cast<concurrent_hash_map *>(this)->internal_find(
2434 key, nullptr, false);
2435 }
2436
2448 template <typename K,
2449 typename = typename std::enable_if<
2450 concurrent_hash_map_internal::
2451 has_transparent_key_equal<hasher>::value,
2452 K>::type>
2453 size_type
2454 count(const K &key) const
2455 {
2456 concurrent_hash_map_internal::check_outside_tx();
2457
2458 return const_cast<concurrent_hash_map *>(this)->internal_find(
2459 key, nullptr, false);
2460 }
2461
2468 bool
2469 find(const_accessor &result, const Key &key) const
2470 {
2471 concurrent_hash_map_internal::check_outside_tx();
2472
2473 result.release();
2474
2475 return const_cast<concurrent_hash_map *>(this)->internal_find(
2476 key, &result, false);
2477 }
2478
2492 template <typename K,
2493 typename = typename std::enable_if<
2494 concurrent_hash_map_internal::
2495 has_transparent_key_equal<hasher>::value,
2496 K>::type>
2497 bool
2498 find(const_accessor &result, const K &key) const
2499 {
2500 concurrent_hash_map_internal::check_outside_tx();
2501
2502 result.release();
2503
2504 return const_cast<concurrent_hash_map *>(this)->internal_find(
2505 key, &result, false);
2506 }
2507
2514 bool
2515 find(accessor &result, const Key &key)
2516 {
2517 concurrent_hash_map_internal::check_outside_tx();
2518
2519 result.release();
2520
2521 return internal_find(key, &result, true);
2522 }
2523
2537 template <typename K,
2538 typename = typename std::enable_if<
2539 concurrent_hash_map_internal::
2540 has_transparent_key_equal<hasher>::value,
2541 K>::type>
2542 bool
2543 find(accessor &result, const K &key)
2544 {
2545 concurrent_hash_map_internal::check_outside_tx();
2546
2547 result.release();
2548
2549 return internal_find(key, &result, true);
2550 }
2558 bool
2559 insert(const_accessor &result, const Key &key)
2560 {
2561 concurrent_hash_map_internal::check_outside_tx();
2562
2563 result.release();
2564
2565 return internal_insert(key, &result, false, key);
2566 }
2567
2575 bool
2576 insert(accessor &result, const Key &key)
2577 {
2578 concurrent_hash_map_internal::check_outside_tx();
2579
2580 result.release();
2581
2582 return internal_insert(key, &result, true, key);
2583 }
2584
2592 bool
2593 insert(const_accessor &result, const value_type &value)
2594 {
2595 concurrent_hash_map_internal::check_outside_tx();
2596
2597 result.release();
2598
2599 return internal_insert(value.first, &result, false, value);
2600 }
2601
2609 bool
2610 insert(accessor &result, const value_type &value)
2611 {
2612 concurrent_hash_map_internal::check_outside_tx();
2613
2614 result.release();
2615
2616 return internal_insert(value.first, &result, true, value);
2617 }
2618
2625 bool
2626 insert(const value_type &value)
2627 {
2628 concurrent_hash_map_internal::check_outside_tx();
2629
2630 return internal_insert(value.first, nullptr, false, value);
2631 }
2632
2640 bool
2641 insert(const_accessor &result, value_type &&value)
2642 {
2643 concurrent_hash_map_internal::check_outside_tx();
2644
2645 result.release();
2646
2647 return internal_insert(value.first, &result, false,
2648 std::move(value));
2649 }
2650
2658 bool
2659 insert(accessor &result, value_type &&value)
2660 {
2661 concurrent_hash_map_internal::check_outside_tx();
2662
2663 result.release();
2664
2665 return internal_insert(value.first, &result, true,
2666 std::move(value));
2667 }
2668
2675 bool
2676 insert(value_type &&value)
2677 {
2678 concurrent_hash_map_internal::check_outside_tx();
2679
2680 return internal_insert(value.first, nullptr, false,
2681 std::move(value));
2682 }
2683
2689 template <typename I>
2690 void
2691 insert(I first, I last)
2692 {
2693 concurrent_hash_map_internal::check_outside_tx();
2694
2695 for (; first != last; ++first)
2696 insert(*first);
2697 }
2698
2704 void
2705 insert(std::initializer_list<value_type> il)
2706 {
2707 concurrent_hash_map_internal::check_outside_tx();
2708
2709 insert(il.begin(), il.end());
2710 }
2711
2720 template <typename M>
2721 bool
2722 insert_or_assign(const key_type &key, M &&obj)
2723 {
2724 concurrent_hash_map_internal::check_outside_tx();
2725
2726 accessor acc;
2727 auto result = internal_insert(key, &acc, true, key,
2728 std::forward<M>(obj));
2729
2730 if (!result) {
2731 pool_base pop = get_pool_base();
2733 acc->second = std::forward<M>(obj);
2735 }
2736
2737 return result;
2738 }
2739
2748 template <typename M>
2749 bool
2750 insert_or_assign(key_type &&key, M &&obj)
2751 {
2752 concurrent_hash_map_internal::check_outside_tx();
2753
2754 accessor acc;
2755 auto result = internal_insert(key, &acc, true, std::move(key),
2756 std::forward<M>(obj));
2757
2758 if (!result) {
2759 pool_base pop = get_pool_base();
2761 acc->second = std::forward<M>(obj);
2763 }
2764
2765 return result;
2766 }
2767
2776 template <
2777 typename K, typename M,
2778 typename = typename std::enable_if<
2779 concurrent_hash_map_internal::has_transparent_key_equal<
2780 hasher>::value &&
2781 std::is_constructible<key_type, K>::value,
2782 K>::type>
2783 bool
2784 insert_or_assign(K &&key, M &&obj)
2785 {
2786 concurrent_hash_map_internal::check_outside_tx();
2787
2788 accessor acc;
2789 auto result =
2790 internal_insert(key, &acc, true, std::forward<K>(key),
2791 std::forward<M>(obj));
2792
2793 if (!result) {
2794 pool_base pop = get_pool_base();
2796 acc->second = std::forward<M>(obj);
2798 }
2799
2800 return result;
2801 }
2802
2811 bool
2812 erase(const Key &key)
2813 {
2814 concurrent_hash_map_internal::check_outside_tx();
2815
2816 return internal_erase(key);
2817 }
2818
2837 pobj_defrag_result
2838 defragment(double start_percent = 0, double amount_percent = 100)
2839 {
2840 double end_percent = start_percent + amount_percent;
2841 if (start_percent < 0 || start_percent >= 100 ||
2842 end_percent < 0 || end_percent > 100 ||
2843 start_percent >= end_percent) {
2844 throw std::range_error("incorrect range");
2845 }
2846
2847 size_t max_index = mask().load(std::memory_order_acquire);
2848 size_t start_index =
2849 static_cast<size_t>((start_percent * max_index) / 100);
2850 size_t end_index =
2851 static_cast<size_t>((end_percent * max_index) / 100);
2852
2853 /* Make sure we do not use too big index, even in case of
2854 * rounding errors. */
2855 end_index = (std::min)(end_index, max_index);
2856
2857#if LIBPMEMOBJ_CPP_VG_HELGRIND_ENABLED
2858 ANNOTATE_HAPPENS_AFTER(&(this->my_mask));
2859#endif
2860
2861 /* Create defrag object for elements in the current pool */
2862 pmem::obj::defrag my_defrag(this->get_pool_base());
2863 mutex_vector mv;
2864
2865 /*
2866 * Locks are taken in the backward order to avoid deadlocks
2867 * with the rehashing of buckets.
2868 *
2869 * We do '+ 1' and '- 1' to handle the 'i == 0' case.
2870 */
2871 for (size_t i = end_index + 1; i >= start_index + 1; i--) {
2872 /*
2873 * All locks will be unlocked automatically
2874 * in the destructor of 'mv'.
2875 */
2876 bucket *b = mv.push_and_try_lock(this, i - 1);
2877 if (b == nullptr)
2878 continue;
2879
2880 defrag_save_nodes(b, my_defrag);
2881 }
2882
2883 return my_defrag.run();
2884 }
2885
2900 template <typename K,
2901 typename = typename std::enable_if<
2902 concurrent_hash_map_internal::
2903 has_transparent_key_equal<hasher>::value,
2904 K>::type>
2905 bool
2906 erase(const K &key)
2907 {
2908 concurrent_hash_map_internal::check_outside_tx();
2909
2910 return internal_erase(key);
2911 }
2912
2913protected:
2914 /*
2915 * Try to acquire the mutex for read or write.
2916 *
2917 * If acquiring succeeds returns true, otherwise retries for few times.
2918 * If acquiring fails after all attempts returns false.
2919 */
2920 bool try_acquire_item(const_accessor *result, node_mutex_t &mutex,
2921 bool write);
2922
2928 public:
2929 using mutex_t = MutexType;
2930
2932 bucket *
2934 {
2935 vec.emplace_back(base, h, true /*writer*/);
2936 bucket *b = vec.back().get();
2937
2938 auto node_ptr = static_cast<node *>(
2939 b->node_list.get(base->my_pool_uuid));
2940
2941 while (node_ptr) {
2942 const_accessor ca;
2943 if (!base->try_acquire_item(&ca,
2944 node_ptr->mutex,
2945 /*write=*/true)) {
2946 vec.pop_back();
2947 return nullptr;
2948 }
2949
2950 node_ptr =
2951 static_cast<node *>(node_ptr->next.get(
2952 (base->my_pool_uuid)));
2953 }
2954
2955 return b;
2956 }
2957
2958 private:
2959 std::vector<bucket_accessor> vec;
2960 };
2961
2962 template <typename K>
2963 bool internal_find(const K &key, const_accessor *result, bool write);
2964
2965 template <typename K, typename... Args>
2966 bool internal_insert(const K &key, const_accessor *result, bool write,
2967 Args &&... args);
2968
2969 /* Obtain pointer to node and lock bucket */
2970 template <bool Bucket_rw_lock, typename K>
2971 persistent_node_ptr_t
2972 get_node(const K &key, bucket_accessor &b)
2973 {
2974 /* find a node */
2975 auto n = search_bucket(key, b.get());
2976
2977 if (!n) {
2978 if (Bucket_rw_lock && !b.is_writer() &&
2979 !scoped_lock_traits_type::upgrade_to_writer(b)) {
2980 /* Rerun search_list, in case another
2981 * thread inserted the item during the
2982 * upgrade. */
2983 n = search_bucket(key, b.get());
2984 if (n) {
2985 /* unfortunately, it did */
2986 scoped_lock_traits_type::
2987 downgrade_to_reader(b);
2988 return n;
2989 }
2990 }
2991 }
2992
2993 return n;
2994 }
2995
2996 template <typename K>
2997 bool internal_erase(const K &key);
2998
2999 void clear_segment(segment_index_t s);
3000
3004 void internal_copy(const concurrent_hash_map &source);
3005
3006 template <typename I>
3007 void internal_copy(I first, I last);
3008
3013 void
3015 {
3016 auto node_ptr = static_cast<node *>(
3017 b->node_list.get(this->my_pool_uuid));
3018
3019 while (node_ptr) {
3020 /*
3021 * We do not perform the defragmentation
3022 * on node pointers, because nodes
3023 * always have the same size.
3024 */
3025 defrag.add(node_ptr->item.first);
3026 defrag.add(node_ptr->item.second);
3027
3028 node_ptr = static_cast<node *>(
3029 node_ptr->next.get((this->my_pool_uuid)));
3030 }
3031 }
3032}; // class concurrent_hash_map
3033
3034template <typename Key, typename T, typename Hash, typename KeyEqual,
3035 typename MutexType, typename ScopedLockType>
3036bool
3037concurrent_hash_map<Key, T, Hash, KeyEqual, MutexType,
3038 ScopedLockType>::try_acquire_item(const_accessor *result,
3039 node_mutex_t &mutex,
3040 bool write)
3041{
3042 /* acquire the item */
3043 if (!result->try_acquire(mutex, write)) {
3044 for (detail::atomic_backoff backoff(true);;) {
3045 if (result->try_acquire(mutex, write))
3046 break;
3047
3048 if (!backoff.bounded_pause())
3049 return false;
3050 }
3051 }
3052
3053 return true;
3054}
3055
3056template <typename Key, typename T, typename Hash, typename KeyEqual,
3057 typename MutexType, typename ScopedLockType>
3058template <typename K>
3059bool
3060concurrent_hash_map<Key, T, Hash, KeyEqual, MutexType,
3061 ScopedLockType>::internal_find(const K &key,
3062 const_accessor *result,
3063 bool write)
3064{
3065 assert(!result || !result->my_node);
3066
3067 hashcode_type m = mask().load(std::memory_order_acquire);
3068#if LIBPMEMOBJ_CPP_VG_HELGRIND_ENABLED
3069 ANNOTATE_HAPPENS_AFTER(&(this->my_mask));
3070#endif
3071
3072 assert((m & (m + 1)) == 0);
3073
3074 hashcode_type const h = hasher{}(key);
3075
3076 persistent_node_ptr_t node;
3077
3078 while (true) {
3079 /* get bucket and acquire the lock */
3080 bucket_accessor b(
3081 this, h & m,
3082 scoped_lock_traits_type::initial_rw_state(false));
3083 node = get_node<false>(key, b);
3084
3085 if (!node) {
3086 /* Element was possibly relocated, try again */
3087 if (check_mask_race(h, m)) {
3088 b.release();
3089 continue;
3090 } else {
3091 return false;
3092 }
3093 }
3094
3095 /* No need to acquire the item or item acquired */
3096 if (!result ||
3097 try_acquire_item(
3098 result, node.get(this->my_pool_uuid)->mutex, write))
3099 break;
3100
3101 /* the wait takes really long, restart the
3102 * operation */
3103 b.release();
3104
3105 std::this_thread::yield();
3106
3107 m = mask().load(std::memory_order_acquire);
3108#if LIBPMEMOBJ_CPP_VG_HELGRIND_ENABLED
3109 ANNOTATE_HAPPENS_AFTER(&(this->my_mask));
3110#endif
3111 }
3112
3113 if (result) {
3114 result->my_node = node.get_persistent_ptr(this->my_pool_uuid);
3115 result->my_hash = h;
3116 }
3117
3118 return true;
3119}
3120
3121template <typename Key, typename T, typename Hash, typename KeyEqual,
3122 typename MutexType, typename ScopedLockType>
3123template <typename K, typename... Args>
3124bool
3125concurrent_hash_map<Key, T, Hash, KeyEqual, MutexType,
3126 ScopedLockType>::internal_insert(const K &key,
3127 const_accessor *result,
3128 bool write,
3129 Args &&... args)
3130{
3131 assert(!result || !result->my_node);
3132
3133 hashcode_type m = mask().load(std::memory_order_acquire);
3134#if LIBPMEMOBJ_CPP_VG_HELGRIND_ENABLED
3135 ANNOTATE_HAPPENS_AFTER(&(this->my_mask));
3136#endif
3137
3138 assert((m & (m + 1)) == 0);
3139
3140 hashcode_type const h = hasher{}(key);
3141
3142 persistent_node_ptr_t node;
3143 size_t new_size = 0;
3144 bool inserted = false;
3145
3146 while (true) {
3147 /* get bucket and acquire the lock */
3148 bucket_accessor b(
3149 this, h & m,
3150 scoped_lock_traits_type::initial_rw_state(true));
3151 node = get_node<true>(key, b);
3152
3153 if (!node) {
3154 /* Element was possibly relocated, try again */
3155 if (check_mask_race(h, m)) {
3156 b.release();
3157 continue;
3158 }
3159
3160 /* insert and set flag to grow the container */
3161 new_size = insert_new_node(b.get(), node,
3162 std::forward<Args>(args)...);
3163 inserted = true;
3164 }
3165
3166 /* No need to acquire the item or item acquired */
3167 if (!result ||
3168 try_acquire_item(
3169 result, node.get(this->my_pool_uuid)->mutex, write))
3170 break;
3171
3172 /* the wait takes really long, restart the
3173 * operation */
3174 b.release();
3175
3176 std::this_thread::yield();
3177
3178 m = mask().load(std::memory_order_acquire);
3179#if LIBPMEMOBJ_CPP_VG_HELGRIND_ENABLED
3180 ANNOTATE_HAPPENS_AFTER(&(this->my_mask));
3181#endif
3182 }
3183
3184 if (result) {
3185 result->my_node = node.get_persistent_ptr(this->my_pool_uuid);
3186 result->my_hash = h;
3187 }
3188
3189 check_growth(m, new_size);
3190
3191 return inserted;
3192}
3193
3194template <typename Key, typename T, typename Hash, typename KeyEqual,
3195 typename MutexType, typename ScopedLockType>
3196template <typename K>
3197bool
3198concurrent_hash_map<Key, T, Hash, KeyEqual, MutexType,
3199 ScopedLockType>::internal_erase(const K &key)
3200{
3201 node_ptr_t n;
3202 hashcode_type const h = hasher{}(key);
3203 hashcode_type m = mask().load(std::memory_order_acquire);
3204#if LIBPMEMOBJ_CPP_VG_HELGRIND_ENABLED
3205 ANNOTATE_HAPPENS_AFTER(&(this->my_mask));
3206#endif
3207
3208 pool_base pop = get_pool_base();
3209
3210restart : {
3211 /* lock scope */
3212 /* get bucket */
3213 bucket_accessor b(this, h & m,
3214 scoped_lock_traits_type::initial_rw_state(true));
3215
3216search:
3217 node_ptr_t *p = &b->node_list;
3218 n = *p;
3219
3220 while (n &&
3221 !key_equal{}(key,
3222 detail::static_persistent_pool_pointer_cast<node>(
3223 n)(this->my_pool_uuid)
3224 ->item.first)) {
3225 p = &n(this->my_pool_uuid)->next;
3226 n = *p;
3227 }
3228
3229 if (!n) {
3230 /* not found, but mask could be changed */
3231 if (check_mask_race(h, m))
3232 goto restart;
3233
3234 return false;
3235 } else if (!b.is_writer() &&
3236 !scoped_lock_traits_type::upgrade_to_writer(b)) {
3237 if (check_mask_race(h, m)) /* contended upgrade, check mask */
3238 goto restart;
3239
3240 goto search;
3241 }
3242
3243 persistent_ptr<node> del = n(this->my_pool_uuid);
3244
3245 {
3246 /* We cannot remove this element immediately because
3247 * other threads might work with this element via
3248 * accessors. The item_locker required to wait while
3249 * other threads use the node. */
3250 const_accessor acc;
3251 if (!try_acquire_item(&acc, del->mutex, true)) {
3252 /* the wait takes really long, restart the operation */
3253 b.release();
3254
3255 std::this_thread::yield();
3256
3257 m = mask().load(std::memory_order_acquire);
3258
3259 goto restart;
3260 }
3261 }
3262
3263 assert(pmemobj_tx_stage() == TX_STAGE_NONE);
3264
3265 auto &size_diff = this->thread_size_diff();
3266
3267 /* Only one thread can delete it due to write lock on the bucket
3268 */
3269 flat_transaction::run(pop, [&] {
3270 *p = del->next;
3271 delete_node(del);
3272
3273 --size_diff;
3274 });
3275
3276 --(this->my_size);
3277}
3278
3279 return true;
3280}
3281
3282template <typename Key, typename T, typename Hash, typename KeyEqual,
3283 typename MutexType, typename ScopedLockType>
3284void
3287{
3288 internal_swap(table);
3289}
3290
3291template <typename Key, typename T, typename Hash, typename KeyEqual,
3292 typename MutexType, typename ScopedLockType>
3293void
3295 size_type sz)
3296{
3297 concurrent_hash_map_internal::check_outside_tx();
3298
3299 reserve(sz);
3300 hashcode_type m = mask();
3301
3302 /* only the last segment should be scanned for rehashing size or first
3303 * index of the last segment */
3304 hashcode_type b = (m + 1) >> 1;
3305
3306 /* zero or power of 2 */
3307 assert((b & (b - 1)) == 0);
3308
3309 for (; b <= m; ++b) {
3310 bucket *bp = get_bucket(b);
3311
3312 concurrent_hash_map_internal::assert_not_locked<mutex_t,
3313 scoped_t>(
3314 bp->mutex);
3315 /* XXX Need to investigate if this statement is needed */
3316 if (bp->is_rehashed(std::memory_order_relaxed) == false)
3317 rehash_bucket<true>(bp, b);
3318 }
3319}
3320
3321template <typename Key, typename T, typename Hash, typename KeyEqual,
3322 typename MutexType, typename ScopedLockType>
3323void
3325{
3326 hashcode_type m = mask();
3327
3328 assert((m & (m + 1)) == 0);
3329
3330#ifndef NDEBUG
3331 /* check consistency */
3332 for (segment_index_t b = 0; b <= m; ++b) {
3333 bucket *bp = get_bucket(b);
3334 concurrent_hash_map_internal::assert_not_locked<mutex_t,
3335 scoped_t>(
3336 bp->mutex);
3337 }
3338#endif
3339
3340 pool_base pop = get_pool_base();
3341 { /* transaction scope */
3342
3344
3345 assert(this->tls_ptr != nullptr);
3346 this->tls_ptr->clear();
3347
3348 this->on_init_size = 0;
3349
3350 segment_index_t s = segment_traits_t::segment_index_of(m);
3351
3352 assert(s + 1 == this->block_table_size ||
3353 !segment_facade_t(this->my_table, s + 1).is_valid());
3354
3355 do {
3356 clear_segment(s);
3357 } while (s-- > 0);
3358
3359 /*
3360 * As clear can only be called
3361 * from one thread, and there can be an outer
3362 * transaction we must make sure that mask and size
3363 * changes are transactional
3364 */
3365 flat_transaction::snapshot((size_t *)&this->my_mask);
3366 flat_transaction::snapshot((size_t *)&this->my_size);
3367
3368 mask().store(embedded_buckets - 1, std::memory_order_relaxed);
3369 this->my_size = 0;
3370
3372 }
3373}
3374
3375template <typename Key, typename T, typename Hash, typename KeyEqual,
3376 typename MutexType, typename ScopedLockType>
3377void
3378concurrent_hash_map<Key, T, Hash, KeyEqual, MutexType,
3379 ScopedLockType>::clear_segment(segment_index_t s)
3380{
3381 segment_facade_t segment(this->my_table, s);
3382
3383 assert(segment.is_valid());
3384
3385 size_type sz = segment.size();
3386 for (segment_index_t i = 0; i < sz; ++i) {
3387 for (node_ptr_t n = segment[i].node_list; n;
3388 n = segment[i].node_list) {
3389 segment[i].node_list = n(this->my_pool_uuid)->next;
3390 delete_node(n);
3391 }
3392 }
3393
3394 if (s >= segment_traits_t::embedded_segments)
3395 segment.disable();
3396}
3397
3398template <typename Key, typename T, typename Hash, typename KeyEqual,
3399 typename MutexType, typename ScopedLockType>
3400void
3403{
3404 auto pop = get_pool_base();
3405
3406 reserve(source.size());
3407 internal_copy(source.begin(), source.end());
3408}
3409
3410template <typename Key, typename T, typename Hash, typename KeyEqual,
3411 typename MutexType, typename ScopedLockType>
3412template <typename I>
3413void
3414concurrent_hash_map<Key, T, Hash, KeyEqual, MutexType,
3415 ScopedLockType>::internal_copy(I first, I last)
3416{
3417 hashcode_type m = mask();
3418
3419 for (; first != last; ++first) {
3420 hashcode_type h = hasher{}(first->first);
3421 bucket *b = get_bucket(h & m);
3422
3423 assert(b->is_rehashed(std::memory_order_relaxed));
3424
3425 detail::persistent_pool_ptr<node> p;
3426 insert_new_node(b, p, *first);
3427 }
3428}
3429
3430template <typename Key, typename T, typename Hash, typename KeyEqual,
3431 typename MutexType, typename ScopedLockType>
3432inline bool
3433operator==(const concurrent_hash_map<Key, T, Hash, KeyEqual, MutexType,
3434 ScopedLockType> &a,
3435 const concurrent_hash_map<Key, T, Hash, KeyEqual, MutexType,
3436 ScopedLockType> &b)
3437{
3438 if (a.size() != b.size())
3439 return false;
3440
3441 typename concurrent_hash_map<Key, T, Hash, KeyEqual, MutexType,
3442 ScopedLockType>::const_iterator
3443 i(a.begin()),
3444 i_end(a.end());
3445
3446 typename concurrent_hash_map<Key, T, Hash, KeyEqual, MutexType,
3447 ScopedLockType>::const_iterator j,
3448 j_end(b.end());
3449
3450 for (; i != i_end; ++i) {
3451 j = b.equal_range(i->first).first;
3452
3453 if (j == j_end || !(i->second == j->second))
3454 return false;
3455 }
3456
3457 return true;
3458}
3459
3460template <typename Key, typename T, typename Hash, typename KeyEqual,
3461 typename MutexType, typename ScopedLockType>
3462inline bool
3463operator!=(const concurrent_hash_map<Key, T, Hash, KeyEqual, MutexType,
3464 ScopedLockType> &a,
3465 const concurrent_hash_map<Key, T, Hash, KeyEqual, MutexType,
3466 ScopedLockType> &b)
3467{
3468 return !(a == b);
3469}
3470
3471template <typename Key, typename T, typename Hash, typename KeyEqual,
3472 typename MutexType, typename ScopedLockType>
3473inline void
3474swap(concurrent_hash_map<Key, T, Hash, KeyEqual, MutexType, ScopedLockType> &a,
3475 concurrent_hash_map<Key, T, Hash, KeyEqual, MutexType, ScopedLockType> &b)
3476{
3477 a.swap(b);
3478}
3479
3480} /* namespace obj */
3481} /* namespace pmem */
3482
3483#endif /* PMEMOBJ_CONCURRENT_HASH_MAP_HPP */
Atomic backoff, for time delay.
static void commit()
Manually commit a transaction.
Definition: transaction.hpp:325
static void snapshot(const T *addr, size_t num=1)
Takes a “snapshot” of given elements of type T number (1 by default), located at the given address pt...
Definition: transaction.hpp:428
Custom layout error class.
Definition: pexceptions.hpp:196
Allows write access to elements and combines data access, locking, and garbage collection.
Definition: concurrent_hash_map.hpp:2070
pointer operator->() const
Return pointer to associated value in hash table.
Definition: concurrent_hash_map.hpp:2084
reference operator*() const
Return reference to associated value in hash table.
Definition: concurrent_hash_map.hpp:2076
Bucket accessor is to find, rehash, acquire a lock, and access a bucket.
Definition: concurrent_hash_map.hpp:1724
bucket * get() const
Get bucket pointer.
Definition: concurrent_hash_map.hpp:1790
void acquire(concurrent_hash_map *base, const hashcode_type h, bool writer=false)
Find a bucket by masked hashcode, optionally rehash, and acquire the lock.
Definition: concurrent_hash_map.hpp:1754
bool is_writer() const
Check whether bucket is locked for write.
Definition: concurrent_hash_map.hpp:1780
bucket * operator->() const
Overloaded arrow operator.
Definition: concurrent_hash_map.hpp:1799
Combines data access, locking, and garbage collection.
Definition: concurrent_hash_map.hpp:1982
bool empty() const
Definition: concurrent_hash_map.hpp:2001
const_accessor()
Create empty result.
Definition: concurrent_hash_map.hpp:2046
void release()
Release accessor.
Definition: concurrent_hash_map.hpp:2013
const typename concurrent_hash_map::value_type value_type
Type of value.
Definition: concurrent_hash_map.hpp:1994
~const_accessor()
Destroy result after releasing the underlying reference.
Definition: concurrent_hash_map.hpp:2054
const_reference operator*() const
Definition: concurrent_hash_map.hpp:2026
const_pointer operator->() const
Definition: concurrent_hash_map.hpp:2036
Vector of locks to be unlocked at the destruction time.
Definition: concurrent_hash_map.hpp:2927
bucket * push_and_try_lock(concurrent_hash_map *base, hashcode_type h)
Save pointer to the lock in the vector and lock it.
Definition: concurrent_hash_map.hpp:2933
Serial bucket accessor used to access bucket in a serial operations.
Definition: concurrent_hash_map.hpp:1808
bool is_writer() const
This method is added for consistency with bucket_accessor class.
Definition: concurrent_hash_map.hpp:1844
bucket * get() const
Get bucket pointer.
Definition: concurrent_hash_map.hpp:1854
bucket * operator->() const
Overloaded arrow operator.
Definition: concurrent_hash_map.hpp:1863
Persistent memory aware implementation of Intel TBB concurrent_hash_map.
Definition: concurrent_hash_map.hpp:1628
bool empty() const
Definition: concurrent_hash_map.hpp:2391
const_iterator end() const
Definition: concurrent_hash_map.hpp:2373
iterator end()
Definition: concurrent_hash_map.hpp:2353
size_type bucket_count() const
Definition: concurrent_hash_map.hpp:2409
concurrent_hash_map & operator=(const concurrent_hash_map &table)
Assignment Not thread safe.
Definition: concurrent_hash_map.hpp:2233
void insert(std::initializer_list< value_type > il)
Insert initializer list.
Definition: concurrent_hash_map.hpp:2705
bool find(accessor &result, const Key &key)
Find item and acquire a write lock on the item.
Definition: concurrent_hash_map.hpp:2515
bool insert(accessor &result, value_type &&value)
Insert item by copying if there is no such key present already and acquire a write lock on the item.
Definition: concurrent_hash_map.hpp:2659
bool find(const_accessor &result, const Key &key) const
Find item and acquire a read lock on the item.
Definition: concurrent_hash_map.hpp:2469
size_type count(const Key &key) const
Definition: concurrent_hash_map.hpp:2429
const_iterator begin() const
Definition: concurrent_hash_map.hpp:2363
size_type size() const
Definition: concurrent_hash_map.hpp:2382
bool insert(const_accessor &result, value_type &&value)
Insert item by copying if there is no such key present already and acquire a read lock on the item.
Definition: concurrent_hash_map.hpp:2641
bool insert(const value_type &value)
Insert item by copying if there is no such key present already.
Definition: concurrent_hash_map.hpp:2626
~concurrent_hash_map()
free_data should be called before concurrent_hash_map destructor is called.
Definition: concurrent_hash_map.hpp:2323
void clear()
Clear hash map content Not thread safe.
Definition: concurrent_hash_map.hpp:3324
void free_data()
Destroys the concurrent_hash_map.
Definition: concurrent_hash_map.hpp:2302
void swap(concurrent_hash_map &table)
Swap two instances.
Definition: concurrent_hash_map.hpp:3285
iterator begin()
Definition: concurrent_hash_map.hpp:2343
void rehash(size_type n=0)
Rehashes and optionally resizes the whole table.
Definition: concurrent_hash_map.hpp:3294
bool insert_or_assign(K &&key, M &&obj)
Inserts item if there is no such key-comparable type present already, assigns provided value otherwis...
Definition: concurrent_hash_map.hpp:2784
void defrag_save_nodes(bucket *b, pmem::obj::defrag &defrag)
Internal method used by defragment().
Definition: concurrent_hash_map.hpp:3014
size_type count(const K &key) const
This overload only participates in overload resolution if the qualified-id Hash::transparent_key_equa...
Definition: concurrent_hash_map.hpp:2454
bool find(accessor &result, const K &key)
Find item and acquire a write lock on the item.
Definition: concurrent_hash_map.hpp:2543
bool insert(const_accessor &result, const value_type &value)
Insert item by copying if there is no such key present already and acquire a read lock on the item.
Definition: concurrent_hash_map.hpp:2593
bool insert(accessor &result, const value_type &value)
Insert item by copying if there is no such key present already and acquire a write lock on the item.
Definition: concurrent_hash_map.hpp:2610
bool erase(const Key &key)
Remove element with corresponding key.
Definition: concurrent_hash_map.hpp:2812
bool insert(value_type &&value)
Insert item by copying if there is no such key present already.
Definition: concurrent_hash_map.hpp:2676
concurrent_hash_map()
Construct empty table.
Definition: concurrent_hash_map.hpp:2093
size_type max_size() const
Upper bound on size.
Definition: concurrent_hash_map.hpp:2400
concurrent_hash_map(I first, I last)
Construction table with copying iteration range.
Definition: concurrent_hash_map.hpp:2135
bool insert(const_accessor &result, const Key &key)
Insert item (if not already present) and acquire a read lock on the item.
Definition: concurrent_hash_map.hpp:2559
bool find(const_accessor &result, const K &key) const
Find item and acquire a read lock on the item.
Definition: concurrent_hash_map.hpp:2498
void internal_copy(const concurrent_hash_map &source)
Copy "source" to *this, where *this must start out empty.
Definition: concurrent_hash_map.hpp:3402
concurrent_hash_map & operator=(std::initializer_list< value_type > il)
Assignment Not thread safe.
Definition: concurrent_hash_map.hpp:2255
concurrent_hash_map(std::initializer_list< value_type > il)
Construct table with initializer list.
Definition: concurrent_hash_map.hpp:2147
concurrent_hash_map(concurrent_hash_map &&table)
Move constructor.
Definition: concurrent_hash_map.hpp:2124
bool erase(const K &key)
Remove element with corresponding key.
Definition: concurrent_hash_map.hpp:2906
bool insert_or_assign(const key_type &key, M &&obj)
Inserts item if there is no such key present already, assigns provided value otherwise.
Definition: concurrent_hash_map.hpp:2722
void insert(I first, I last)
Insert range [first, last)
Definition: concurrent_hash_map.hpp:2691
bool insert(accessor &result, const Key &key)
Insert item (if not already present) and acquire a write lock on the item.
Definition: concurrent_hash_map.hpp:2576
bool insert_or_assign(key_type &&key, M &&obj)
Inserts item if there is no such key present already, assigns provided value otherwise.
Definition: concurrent_hash_map.hpp:2750
void runtime_initialize()
Initialize persistent concurrent hash map after process restart.
Definition: concurrent_hash_map.hpp:2165
concurrent_hash_map(const concurrent_hash_map &table)
Copy constructor.
Definition: concurrent_hash_map.hpp:2112
concurrent_hash_map(size_type n)
Construct empty table with n preallocated buckets.
Definition: concurrent_hash_map.hpp:2102
pobj_defrag_result defragment(double start_percent=0, double amount_percent=100)
Defragment the given (by 'start_percent' and 'amount_percent') part of buckets of the hash map.
Definition: concurrent_hash_map.hpp:2838
Defrag class.
Definition: defrag.hpp:83
std::enable_if< is_defragmentable< T >(), void >::type add(T &t)
Stores address of the referenced object to the defragmentation queue.
Definition: defrag.hpp:112
pobj_defrag_result run()
Starts defragmentation with previously stored pointers.
Definition: defrag.hpp:188
typename detail::transaction_base< true >::manual manual
C++ manual scope transaction class.
Definition: transaction.hpp:762
static void run(obj::pool_base &pool, std::function< void()> tx, Locks &... locks)
Execute a closure-like transaction and lock locks.
Definition: transaction.hpp:823
Persistent memory resident mutex implementation.
Definition: mutex.hpp:31
Resides on pmem class.
Definition: p.hpp:35
const T & get_ro() const noexcept
Retrieves read-only const reference of the object.
Definition: p.hpp:128
The non-template pool base class.
Definition: pool.hpp:50
Custom transaction error class.
Definition: pexceptions.hpp:176
Commonly used functionality.
Defragmentation class.
A persistent version of thread-local storage.
Persistent_ptr transactional allocation functions for objects.
Pmem-resident mutex.
p< T > & operator--(p< T > &pp)
Prefix decrement operator overload.
Definition: pext.hpp:59
p< T > & operator-=(p< T > &lhs, const p< Y > &rhs)
Subtraction assignment operator overload.
Definition: pext.hpp:116
T & get(pmem::obj::array< T, N > &a)
Non-member get function.
Definition: array.hpp:919
bool operator!=(const allocator< T, P, Tr > &lhs, const OtherAllocator &rhs)
Determines if memory from another allocator can be deallocated from this one.
Definition: allocator.hpp:536
p< T > & operator++(p< T > &pp)
Prefix increment operator overload.
Definition: pext.hpp:48
p< T > & operator+=(p< T > &lhs, const p< Y > &rhs)
Addition assignment operator overload.
Definition: pext.hpp:94
bool operator==(standard_alloc_policy< T > const &, standard_alloc_policy< T2 > const &)
Determines if memory from another allocator can be deallocated from this one.
Definition: allocator.hpp:420
persistent_ptr< T > operator-(persistent_ptr< T > const &lhs, std::ptrdiff_t s)
Subtraction operator for persistent pointers.
Definition: persistent_ptr.hpp:822
pmem::obj::array< T, N >::iterator begin(pmem::obj::array< T, N > &a)
Non-member begin.
Definition: array.hpp:829
persistent_ptr< T > operator+(persistent_ptr< T > const &lhs, std::ptrdiff_t s)
Addition operator for persistent pointers.
Definition: persistent_ptr.hpp:808
void swap(pmem::obj::array< T, N > &lhs, pmem::obj::array< T, N > &rhs)
Non-member swap function.
Definition: array.hpp:909
Persistent memory namespace.
Definition: allocation_flag.hpp:15
Resides on pmem property template.
Persistent smart pointer.
Pmem-resident shared mutex.
Commonly used SFINAE helpers.
C++ pmemobj transactions.