Intel(R) Threading Building Blocks Doxygen Documentation version 4.2.3
_concurrent_queue_impl.h
Go to the documentation of this file.
1/*
2 Copyright (c) 2005-2020 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
17#ifndef __TBB__concurrent_queue_impl_H
18#define __TBB__concurrent_queue_impl_H
19
20#ifndef __TBB_concurrent_queue_H
21#error Do not #include this internal file directly; use public TBB headers instead.
22#endif
23
24#include "../tbb_stddef.h"
25#include "../tbb_machine.h"
26#include "../atomic.h"
27#include "../spin_mutex.h"
28#include "../cache_aligned_allocator.h"
29#include "../tbb_exception.h"
30#include "../tbb_profiling.h"
31#include <new>
32#include __TBB_STD_SWAP_HEADER
33#include <iterator>
34
35namespace tbb {
36
37#if !__TBB_TEMPLATE_FRIENDS_BROKEN
38
39// forward declaration
40namespace strict_ppl {
41template<typename T, typename A> class concurrent_queue;
42}
43
44template<typename T, typename A> class concurrent_bounded_queue;
45
46#endif
47
49namespace strict_ppl {
50
52namespace internal {
53
54using namespace tbb::internal;
55
56typedef size_t ticket;
57
58template<typename T> class micro_queue ;
59template<typename T> class micro_queue_pop_finalizer ;
60template<typename T> class concurrent_queue_base_v3;
61template<typename T> struct concurrent_queue_rep;
62
64
68 template<typename T> friend class micro_queue;
69 template<typename T> friend class concurrent_queue_base_v3;
70
71protected:
73 static const size_t phi = 3;
74
75public:
76 // must be power of 2
77 static const size_t n_queue = 8;
78
80 struct page {
82 uintptr_t mask;
83 };
84
85 atomic<ticket> head_counter;
86 char pad1[NFS_MaxLineSize-sizeof(atomic<ticket>)];
87 atomic<ticket> tail_counter;
88 char pad2[NFS_MaxLineSize-sizeof(atomic<ticket>)];
89
92
94 size_t item_size;
95
97 atomic<size_t> n_invalid_entries;
98
99 char pad3[NFS_MaxLineSize-sizeof(size_t)-sizeof(size_t)-sizeof(atomic<size_t>)];
100} ;
101
103 return uintptr_t(p)>1;
104}
105
107
111{
112 template<typename T> friend class micro_queue ;
113 template<typename T> friend class micro_queue_pop_finalizer ;
114protected:
116private:
119} ;
120
121#if _MSC_VER && !defined(__INTEL_COMPILER)
122// unary minus operator applied to unsigned type, result still unsigned
123#pragma warning( push )
124#pragma warning( disable: 4146 )
125#endif
126
128
130template<typename T>
132public:
133 typedef void (*item_constructor_t)(T* location, const void* src);
134private:
136
140 public:
143 };
144
145 void copy_item( page& dst, size_t dindex, const void* src, item_constructor_t construct_item ) {
146 construct_item( &get_ref(dst, dindex), src );
147 }
148
149 void copy_item( page& dst, size_t dindex, const page& src, size_t sindex,
150 item_constructor_t construct_item )
151 {
152 T& src_item = get_ref( const_cast<page&>(src), sindex );
153 construct_item( &get_ref(dst, dindex), static_cast<const void*>(&src_item) );
154 }
155
156 void assign_and_destroy_item( void* dst, page& src, size_t index ) {
157 T& from = get_ref(src,index);
158 destroyer d(from);
159 *static_cast<T*>(dst) = tbb::internal::move( from );
160 }
161
162 void spin_wait_until_my_turn( atomic<ticket>& counter, ticket k, concurrent_queue_rep_base& rb ) const ;
163
164public:
165 friend class micro_queue_pop_finalizer<T>;
166
171 void operator=( const padded_page& );
174 };
175
176 static T& get_ref( page& p, size_t index ) {
177 return (&static_cast<padded_page*>(static_cast<void*>(&p))->last)[index];
178 }
179
180 atomic<page*> head_page;
181 atomic<ticket> head_counter;
182
183 atomic<page*> tail_page;
184 atomic<ticket> tail_counter;
185
187
188 void push( const void* item, ticket k, concurrent_queue_base_v3<T>& base,
189 item_constructor_t construct_item ) ;
190
191 bool pop( void* dst, ticket k, concurrent_queue_base_v3<T>& base ) ;
192
194 item_constructor_t construct_item ) ;
195
196 page* make_copy( concurrent_queue_base_v3<T>& base, const page* src_page, size_t begin_in_page,
197 size_t end_in_page, ticket& g_index, item_constructor_t construct_item ) ;
198
200};
201
202template<typename T>
203void micro_queue<T>::spin_wait_until_my_turn( atomic<ticket>& counter, ticket k, concurrent_queue_rep_base& rb ) const {
204 for( atomic_backoff b(true);;b.pause() ) {
205 ticket c = counter;
206 if( c==k ) return;
207 else if( c&1 ) {
210 }
211 }
212}
213
214template<typename T>
216 item_constructor_t construct_item )
217{
219 page* p = NULL;
220 size_t index = modulo_power_of_two( k/concurrent_queue_rep_base::n_queue, base.my_rep->items_per_page);
221 if( !index ) {
222 __TBB_TRY {
224 p = pa.allocate_page();
225 } __TBB_CATCH (...) {
226 ++base.my_rep->n_invalid_entries;
227 invalidate_page_and_rethrow( k );
228 }
229 p->mask = 0;
230 p->next = NULL;
231 }
232
233 if( tail_counter != k ) spin_wait_until_my_turn( tail_counter, k, *base.my_rep );
234 call_itt_notify(acquired, &tail_counter);
235
236 if( p ) {
237 spin_mutex::scoped_lock lock( page_mutex );
238 page* q = tail_page;
239 if( is_valid_page(q) )
240 q->next = p;
241 else
242 head_page = p;
243 tail_page = p;
244 } else {
245 p = tail_page;
246 }
247
248 __TBB_TRY {
249 copy_item( *p, index, item, construct_item );
250 // If no exception was thrown, mark item as present.
251 itt_hide_store_word(p->mask, p->mask | uintptr_t(1)<<index);
252 call_itt_notify(releasing, &tail_counter);
254 } __TBB_CATCH (...) {
255 ++base.my_rep->n_invalid_entries;
256 call_itt_notify(releasing, &tail_counter);
259 }
260}
261
262template<typename T>
265 if( head_counter!=k ) spin_wait_until_eq( head_counter, k );
266 call_itt_notify(acquired, &head_counter);
267 if( tail_counter==k ) spin_wait_while_eq( tail_counter, k );
268 call_itt_notify(acquired, &tail_counter);
269 page *p = head_page;
270 __TBB_ASSERT( p, NULL );
271 size_t index = modulo_power_of_two( k/concurrent_queue_rep_base::n_queue, base.my_rep->items_per_page );
272 bool success = false;
273 {
274 micro_queue_pop_finalizer<T> finalizer( *this, base, k+concurrent_queue_rep_base::n_queue, index==base.my_rep->items_per_page-1 ? p : NULL );
275 if( p->mask & uintptr_t(1)<<index ) {
276 success = true;
277 assign_and_destroy_item( dst, *p, index );
278 } else {
279 --base.my_rep->n_invalid_entries;
280 }
281 }
282 return success;
283}
284
285template<typename T>
287 item_constructor_t construct_item )
288{
289 head_counter = src.head_counter;
290 tail_counter = src.tail_counter;
291
292 const page* srcp = src.head_page;
293 if( is_valid_page(srcp) ) {
294 ticket g_index = head_counter;
295 __TBB_TRY {
296 size_t n_items = (tail_counter-head_counter)/concurrent_queue_rep_base::n_queue;
297 size_t index = modulo_power_of_two( head_counter/concurrent_queue_rep_base::n_queue, base.my_rep->items_per_page );
298 size_t end_in_first_page = (index+n_items<base.my_rep->items_per_page)?(index+n_items):base.my_rep->items_per_page;
299
300 head_page = make_copy( base, srcp, index, end_in_first_page, g_index, construct_item );
301 page* cur_page = head_page;
302
303 if( srcp != src.tail_page ) {
304 for( srcp = srcp->next; srcp!=src.tail_page; srcp=srcp->next ) {
305 cur_page->next = make_copy( base, srcp, 0, base.my_rep->items_per_page, g_index, construct_item );
306 cur_page = cur_page->next;
307 }
308
309 __TBB_ASSERT( srcp==src.tail_page, NULL );
310 size_t last_index = modulo_power_of_two( tail_counter/concurrent_queue_rep_base::n_queue, base.my_rep->items_per_page );
311 if( last_index==0 ) last_index = base.my_rep->items_per_page;
312
313 cur_page->next = make_copy( base, srcp, 0, last_index, g_index, construct_item );
314 cur_page = cur_page->next;
315 }
316 tail_page = cur_page;
317 } __TBB_CATCH (...) {
318 invalidate_page_and_rethrow( g_index );
319 }
320 } else {
321 head_page = tail_page = NULL;
322 }
323 return *this;
324}
325
326template<typename T>
328 // Append an invalid page at address 1 so that no more pushes are allowed.
329 page* invalid_page = (page*)uintptr_t(1);
330 {
331 spin_mutex::scoped_lock lock( page_mutex );
333 page* q = tail_page;
334 if( is_valid_page(q) )
335 q->next = invalid_page;
336 else
337 head_page = invalid_page;
338 tail_page = invalid_page;
339 }
341}
342
343template<typename T>
345 const concurrent_queue_rep_base::page* src_page, size_t begin_in_page, size_t end_in_page,
346 ticket& g_index, item_constructor_t construct_item )
347{
349 page* new_page = pa.allocate_page();
350 new_page->next = NULL;
351 new_page->mask = src_page->mask;
352 for( ; begin_in_page!=end_in_page; ++begin_in_page, ++g_index )
353 if( new_page->mask & uintptr_t(1)<<begin_in_page )
354 copy_item( *new_page, begin_in_page, *src_page, begin_in_page, construct_item );
355 return new_page;
356}
357
358template<typename T>
365public:
367 my_ticket(k), my_queue(queue), my_page(p), allocator(b)
368 {}
370};
371
372template<typename T>
374 page* p = my_page;
375 if( is_valid_page(p) ) {
376 spin_mutex::scoped_lock lock( my_queue.page_mutex );
377 page* q = p->next;
378 my_queue.head_page = q;
379 if( !is_valid_page(q) ) {
380 my_queue.tail_page = NULL;
381 }
382 }
383 itt_store_word_with_release(my_queue.head_counter, my_ticket);
384 if( is_valid_page(p) ) {
385 allocator.deallocate_page( p );
386 }
387}
388
389#if _MSC_VER && !defined(__INTEL_COMPILER)
390#pragma warning( pop )
391#endif // warning 4146 is back
392
393template<typename T> class concurrent_queue_iterator_rep ;
394template<typename T> class concurrent_queue_iterator_base_v3;
395
397
400template<typename T>
403
405 static size_t index( ticket k ) {
406 return k*phi%n_queue;
407 }
408
410 // The formula here approximates LRU in a cache-oblivious way.
411 return array[index(k)];
412 }
413};
414
416
420template<typename T>
422private:
425
426 friend struct concurrent_queue_rep<T>;
427 friend class micro_queue<T>;
428 friend class concurrent_queue_iterator_rep<T>;
429 friend class concurrent_queue_iterator_base_v3<T>;
430
431protected:
433
434private:
437
440 size_t n = sizeof(padded_page) + (r.items_per_page-1)*sizeof(T);
441 return reinterpret_cast<page*>(allocate_block ( n ));
442 }
443
446 size_t n = sizeof(padded_page) + (r.items_per_page-1)*sizeof(T);
447 deallocate_block( reinterpret_cast<void*>(p), n );
448 }
449
451 virtual void *allocate_block( size_t n ) = 0;
452
454 virtual void deallocate_block( void *p, size_t n ) = 0;
455
456protected:
458
460#if TBB_USE_ASSERT
461 size_t nq = my_rep->n_queue;
462 for( size_t i=0; i<nq; i++ )
463 __TBB_ASSERT( my_rep->array[i].tail_page==NULL, "pages were not freed properly" );
464#endif /* TBB_USE_ASSERT */
466 }
467
469 void internal_push( const void* src, item_constructor_t construct_item ) {
471 ticket k = r.tail_counter++;
472 r.choose(k).push( src, k, *this, construct_item );
473 }
474
476
477 bool internal_try_pop( void* dst ) ;
478
480 size_t internal_size() const ;
481
483 bool internal_empty() const ;
484
486 /* note that the name may be misleading, but it remains so due to a historical accident. */
488
492 }
493
495 void assign( const concurrent_queue_base_v3& src, item_constructor_t construct_item ) ;
496
497#if __TBB_CPP11_RVALUE_REF_PRESENT
500 std::swap( my_rep, src.my_rep );
501 }
502#endif /* __TBB_CPP11_RVALUE_REF_PRESENT */
503};
504
505template<typename T>
507 const size_t item_size = sizeof(T);
509 __TBB_ASSERT( (size_t)my_rep % NFS_GetLineSize()==0, "alignment error" );
510 __TBB_ASSERT( (size_t)&my_rep->head_counter % NFS_GetLineSize()==0, "alignment error" );
511 __TBB_ASSERT( (size_t)&my_rep->tail_counter % NFS_GetLineSize()==0, "alignment error" );
512 __TBB_ASSERT( (size_t)&my_rep->array % NFS_GetLineSize()==0, "alignment error" );
513 memset(static_cast<void*>(my_rep),0,sizeof(concurrent_queue_rep<T>));
514 my_rep->item_size = item_size;
515 my_rep->items_per_page = item_size<= 8 ? 32 :
516 item_size<= 16 ? 16 :
517 item_size<= 32 ? 8 :
518 item_size<= 64 ? 4 :
519 item_size<=128 ? 2 :
520 1;
521}
522
523template<typename T>
525 concurrent_queue_rep<T>& r = *my_rep;
526 ticket k;
527 do {
528 k = r.head_counter;
529 for(;;) {
530 if( (ptrdiff_t)(r.tail_counter-k)<=0 ) {
531 // Queue is empty
532 return false;
533 }
534 // Queue had item with ticket k when we looked. Attempt to get that item.
535 ticket tk=k;
536#if defined(_MSC_VER) && defined(_Wp64)
537 #pragma warning (push)
538 #pragma warning (disable: 4267)
539#endif
540 k = r.head_counter.compare_and_swap( tk+1, tk );
541#if defined(_MSC_VER) && defined(_Wp64)
542 #pragma warning (pop)
543#endif
544 if( k==tk )
545 break;
546 // Another thread snatched the item, retry.
547 }
548 } while( !r.choose( k ).pop( dst, k, *this ) );
549 return true;
550}
551
552template<typename T>
554 concurrent_queue_rep<T>& r = *my_rep;
555 __TBB_ASSERT( sizeof(ptrdiff_t)<=sizeof(size_t), NULL );
556 ticket hc = r.head_counter;
557 size_t nie = r.n_invalid_entries;
558 ticket tc = r.tail_counter;
559 __TBB_ASSERT( hc!=tc || !nie, NULL );
560 ptrdiff_t sz = tc-hc-nie;
561 return sz<0 ? 0 : size_t(sz);
562}
563
564template<typename T>
566 concurrent_queue_rep<T>& r = *my_rep;
567 ticket tc = r.tail_counter;
568 ticket hc = r.head_counter;
569 // if tc!=r.tail_counter, the queue was not empty at some point between the two reads.
570 return tc==r.tail_counter && tc==hc+r.n_invalid_entries ;
571}
572
573template<typename T>
575 concurrent_queue_rep<T>& r = *my_rep;
576 size_t nq = r.n_queue;
577 for( size_t i=0; i<nq; ++i ) {
578 page* tp = r.array[i].tail_page;
579 if( is_valid_page(tp) ) {
580 __TBB_ASSERT( r.array[i].head_page==tp, "at most one page should remain" );
581 deallocate_page( tp );
582 r.array[i].tail_page = NULL;
583 } else
584 __TBB_ASSERT( !is_valid_page(r.array[i].head_page), "head page pointer corrupt?" );
585 }
586}
587
588template<typename T>
590 item_constructor_t construct_item )
591{
592 concurrent_queue_rep<T>& r = *my_rep;
593 r.items_per_page = src.my_rep->items_per_page;
594
595 // copy concurrent_queue_rep data
596 r.head_counter = src.my_rep->head_counter;
597 r.tail_counter = src.my_rep->tail_counter;
598 r.n_invalid_entries = src.my_rep->n_invalid_entries;
599
600 // copy or move micro_queues
601 for( size_t i = 0; i < r.n_queue; ++i )
602 r.array[i].assign( src.my_rep->array[i], *this, construct_item);
603
604 __TBB_ASSERT( r.head_counter==src.my_rep->head_counter && r.tail_counter==src.my_rep->tail_counter,
605 "the source concurrent queue should not be concurrently modified." );
606}
607
608template<typename Container, typename Value> class concurrent_queue_iterator;
609
610template<typename T>
613public:
618 head_counter(queue.my_rep->head_counter),
619 my_queue(queue)
620 {
621 for( size_t k=0; k<concurrent_queue_rep<T>::n_queue; ++k )
622 array[k] = queue.my_rep->array[k].head_page;
623 }
624
626 bool get_item( T*& item, size_t k ) ;
627};
628
629template<typename T>
631 if( k==my_queue.my_rep->tail_counter ) {
632 item = NULL;
633 return true;
634 } else {
636 __TBB_ASSERT(p,NULL);
638 item = &micro_queue<T>::get_ref(*p,i);
639 return (p->mask & uintptr_t(1)<<i)!=0;
640 }
641}
642
644
645template<typename Value>
648
650
651 template<typename C, typename T, typename U>
653
654 template<typename C, typename T, typename U>
656protected:
658 Value* my_item;
659
662#if __TBB_GCC_OPTIMIZER_ORDERING_BROKEN
664#endif
665 }
666
669 : my_rep(NULL), my_item(NULL) {
670 assign(i);
671 }
672
674 assign(i);
675 return *this;
676 }
677
680
683
685 void advance() ;
686
690 my_rep = NULL;
691 }
692};
693
694template<typename Value>
697 new( my_rep ) concurrent_queue_iterator_rep<Value>(queue);
698 size_t k = my_rep->head_counter;
699 if( !my_rep->get_item(my_item, k) ) advance();
700}
701
702template<typename Value>
704 if( my_rep!=other.my_rep ) {
705 if( my_rep ) {
707 my_rep = NULL;
708 }
709 if( other.my_rep ) {
711 new( my_rep ) concurrent_queue_iterator_rep<Value>( *other.my_rep );
712 }
713 }
714 my_item = other.my_item;
715}
716
717template<typename Value>
719 __TBB_ASSERT( my_item, "attempt to increment iterator past end of queue" );
720 size_t k = my_rep->head_counter;
721 const concurrent_queue_base_v3<Value>& queue = my_rep->my_queue;
722#if TBB_USE_ASSERT
723 Value* tmp;
724 my_rep->get_item(tmp,k);
725 __TBB_ASSERT( my_item==tmp, NULL );
726#endif /* TBB_USE_ASSERT */
728 if( i==queue.my_rep->items_per_page-1 ) {
730 root = root->next;
731 }
732 // advance k
733 my_rep->head_counter = ++k;
734 if( !my_rep->get_item(my_item, k) ) advance();
735}
736
738
739template<typename T> struct tbb_remove_cv {typedef T type;};
740template<typename T> struct tbb_remove_cv<const T> {typedef T type;};
741template<typename T> struct tbb_remove_cv<volatile T> {typedef T type;};
742template<typename T> struct tbb_remove_cv<const volatile T> {typedef T type;};
743
745
747template<typename Container, typename Value>
748class concurrent_queue_iterator: public concurrent_queue_iterator_base_v3<typename tbb_remove_cv<Value>::type>,
749 public std::iterator<std::forward_iterator_tag,Value> {
750#if !__TBB_TEMPLATE_FRIENDS_BROKEN
751 template<typename T, class A>
752 friend class ::tbb::strict_ppl::concurrent_queue;
753#else
754public:
755#endif
759 {
760 }
761
762public:
764
769 {}
770
774 return *this;
775 }
776
778 Value& operator*() const {
779 return *static_cast<Value*>(this->my_item);
780 }
781
782 Value* operator->() const {return &operator*();}
783
786 this->advance();
787 return *this;
788 }
789
791 Value* operator++(int) {
792 Value* result = &operator*();
793 operator++();
794 return result;
795 }
796}; // concurrent_queue_iterator
797
798
799template<typename C, typename T, typename U>
801 return i.my_item==j.my_item;
802}
803
804template<typename C, typename T, typename U>
806 return i.my_item!=j.my_item;
807}
808
809} // namespace internal
810
812
813} // namespace strict_ppl
814
816namespace internal {
817
821template<typename Container, typename Value> class concurrent_queue_iterator;
822
824
827private:
830
832 friend struct micro_queue;
836protected:
838 struct page {
840 uintptr_t mask;
841 };
842
844 ptrdiff_t my_capacity;
845
848
850 size_t item_size;
851
853
854#if __TBB_PROTECTED_NESTED_CLASS_BROKEN
855public:
856#endif
857 template<typename T>
862 void operator=( const padded_page& );
865 };
866
867private:
868 virtual void copy_item( page& dst, size_t index, const void* src ) = 0;
869 virtual void assign_and_destroy_item( void* dst, page& src, size_t index ) = 0;
870protected:
873
875 void __TBB_EXPORTED_METHOD internal_push( const void* src );
876
878 void __TBB_EXPORTED_METHOD internal_pop( void* dst );
879
882
884 bool __TBB_EXPORTED_METHOD internal_push_if_not_full( const void* src );
885
887
889
891 ptrdiff_t __TBB_EXPORTED_METHOD internal_size() const;
892
895
897 void __TBB_EXPORTED_METHOD internal_set_capacity( ptrdiff_t capacity, size_t element_size );
898
900 virtual page *allocate_page() = 0;
901
903 virtual void deallocate_page( page *p ) = 0;
904
906 /* note that the name may be misleading, but it remains so due to a historical accident. */
908
911
914
915#if __TBB_CPP11_RVALUE_REF_PRESENT
921 std::swap( my_rep, src.my_rep );
922 }
923#endif /* __TBB_CPP11_RVALUE_REF_PRESENT */
924
926 void internal_insert_item( const void* src, copy_specifics op_type );
927
929 bool internal_insert_if_not_full( const void* src, copy_specifics op_type );
930
932 void internal_assign( const concurrent_queue_base_v3& src, copy_specifics op_type );
933private:
934 virtual void copy_page_item( page& dst, size_t dindex, const page& src, size_t sindex ) = 0;
935};
936
938
941protected:
942 concurrent_queue_base_v8( size_t item_sz ) : concurrent_queue_base_v3( item_sz ) {}
943
946
949
951 void __TBB_EXPORTED_METHOD internal_push_move( const void* src );
952private:
953 friend struct micro_queue;
954 virtual void move_page_item( page& dst, size_t dindex, const page& src, size_t sindex ) = 0;
955 virtual void move_item( page& dst, size_t index, const void* src ) = 0;
956};
957
959
962
964
965 template<typename C, typename T, typename U>
967
968 template<typename C, typename T, typename U>
970
971 void initialize( const concurrent_queue_base_v3& queue, size_t offset_of_data );
972protected:
974 void* my_item;
975
978
981 assign(i);
982 }
983
985 assign(i);
986 return *this;
987 }
988
990
992
995
998
1001
1004};
1005
1007
1009
1011template<typename Container, typename Value>
1013 public std::iterator<std::forward_iterator_tag,Value> {
1014
1015#if !__TBB_TEMPLATE_FRIENDS_BROKEN
1016 template<typename T, class A>
1017 friend class ::tbb::concurrent_bounded_queue;
1018#else
1019public:
1020#endif
1021
1025 {
1026 }
1027
1028public:
1030
1035 {}
1036
1040 return *this;
1041 }
1042
1044 Value& operator*() const {
1045 return *static_cast<Value*>(my_item);
1046 }
1047
1048 Value* operator->() const {return &operator*();}
1049
1052 advance();
1053 return *this;
1054 }
1055
1057 Value* operator++(int) {
1058 Value* result = &operator*();
1059 operator++();
1060 return result;
1061 }
1062}; // concurrent_queue_iterator
1063
1064
1065template<typename C, typename T, typename U>
1067 return i.my_item==j.my_item;
1068}
1069
1070template<typename C, typename T, typename U>
1072 return i.my_item!=j.my_item;
1073}
1074
1075} // namespace internal;
1076
1078
1079} // namespace tbb
1080
1081#endif /* __TBB__concurrent_queue_impl_H */
#define __TBB_compiler_fence()
Definition: icc_generic.h:51
#define __TBB_CATCH(e)
Definition: tbb_stddef.h:284
#define __TBB_TRY
Definition: tbb_stddef.h:283
#define __TBB_EXPORTED_METHOD
Definition: tbb_stddef.h:98
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:165
#define __TBB_offsetof(class_name, member_name)
Extended variant of the standard offsetof macro.
Definition: tbb_stddef.h:266
#define __TBB_override
Definition: tbb_stddef.h:240
#define __TBB_RETHROW()
Definition: tbb_stddef.h:286
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void * lock
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long value
void const char const char int ITT_FORMAT __itt_group_sync p
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle __itt_metadata_type type
const size_t NFS_MaxLineSize
Compile-time constant that is upper bound on cache line/sector size.
Definition: tbb_stddef.h:216
size_t __TBB_EXPORTED_FUNC NFS_GetLineSize()
Cache/sector line size.
The graph class.
void move(tbb_thread &t1, tbb_thread &t2)
Definition: tbb_thread.h:319
Identifiers declared inside namespace internal should never be used directly by client code.
Definition: atomic.h:65
void swap(atomic< T > &lhs, atomic< T > &rhs)
Definition: atomic.h:564
bool operator!=(const vector_iterator< Container, T > &i, const vector_iterator< Container, U > &j)
bool operator==(const vector_iterator< Container, T > &i, const vector_iterator< Container, U > &j)
void throw_exception(exception_id eid)
Versionless convenience wrapper for throw_exception_v4()
void itt_hide_store_word(T &dst, T src)
void spin_wait_until_eq(const volatile T &location, const U value)
Spin UNTIL the value of the variable is equal to a given value.
Definition: tbb_machine.h:399
void itt_store_word_with_release(tbb::atomic< T > &dst, U src)
auto last(Container &c) -> decltype(begin(c))
concurrent_queue_iterator_base_v3 concurrent_queue_iterator_base
argument_integer_type modulo_power_of_two(argument_integer_type arg, divisor_integer_type divisor)
A function to compute arg modulo divisor where divisor is a power of 2.
Definition: tbb_stddef.h:382
void call_itt_notify(notify_type, void *)
void spin_wait_while_eq(const volatile T &location, U value)
Spin WHILE the value of the variable is equal to a given value.
Definition: tbb_machine.h:391
bool operator==(const concurrent_queue_iterator< C, T > &i, const concurrent_queue_iterator< C, U > &j)
bool is_valid_page(const concurrent_queue_rep_base::page *p)
bool operator!=(const concurrent_queue_iterator< C, T > &i, const concurrent_queue_iterator< C, U > &j)
Meets "allocator" requirements of ISO C++ Standard, Section 20.1.5.
static T & get_ref(page &p, size_t index)
concurrent_queue_rep_base::page page
void copy_item(page &dst, size_t dindex, const page &src, size_t sindex, item_constructor_t construct_item)
void assign_and_destroy_item(void *dst, page &src, size_t index)
void spin_wait_until_my_turn(atomic< ticket > &counter, ticket k, concurrent_queue_rep_base &rb) const
void push(const void *item, ticket k, concurrent_queue_base_v3< T > &base, item_constructor_t construct_item)
void(* item_constructor_t)(T *location, const void *src)
page * make_copy(concurrent_queue_base_v3< T > &base, const page *src_page, size_t begin_in_page, size_t end_in_page, ticket &g_index, item_constructor_t construct_item)
micro_queue & assign(const micro_queue &src, concurrent_queue_base_v3< T > &base, item_constructor_t construct_item)
bool pop(void *dst, ticket k, concurrent_queue_base_v3< T > &base)
void copy_item(page &dst, size_t dindex, const void *src, item_constructor_t construct_item)
micro_queue_pop_finalizer(micro_queue< T > &queue, concurrent_queue_base_v3< T > &b, ticket k, page *p)
void internal_push(const void *src, item_constructor_t construct_item)
Enqueue item at tail of queue.
void internal_swap(concurrent_queue_base_v3 &src)
swap internal representation
micro_queue< T >::item_constructor_t item_constructor_t
virtual void * allocate_block(size_t n)=0
custom allocator
void assign(const concurrent_queue_base_v3 &src, item_constructor_t construct_item)
copy or move internal representation
bool internal_empty() const
check if the queue is empty; thread safe
size_t internal_size() const
Get size of queue; result may be invalid if queue is modified concurrently.
concurrent_queue_rep< T > * my_rep
Internal representation.
virtual void deallocate_block(void *p, size_t n)=0
custom de-allocator
bool internal_try_pop(void *dst)
Attempt to dequeue item from queue.
virtual void deallocate_page(concurrent_queue_rep_base::page *p) __TBB_override
representation of concurrent_queue_base
static size_t index(ticket k)
Map ticket to an array index.
parts of concurrent_queue_rep that do not have references to micro_queue
static const size_t phi
Approximately n_queue/golden ratio.
atomic< size_t > n_invalid_entries
number of invalid entries in the queue
char pad1[NFS_MaxLineSize-sizeof(atomic< ticket >)]
char pad2[NFS_MaxLineSize-sizeof(atomic< ticket >)]
char pad3[NFS_MaxLineSize-sizeof(size_t) -sizeof(size_t) -sizeof(atomic< size_t >)]
Abstract class to define interface for page allocation/deallocation.
virtual concurrent_queue_rep_base::page * allocate_page()=0
virtual void deallocate_page(concurrent_queue_rep_base::page *p)=0
Class used to ensure exception-safety of method "pop".
padded_page()
Not defined anywhere - exists to quiet warnings.
void operator=(const padded_page &)
Not defined anywhere - exists to quiet warnings.
concurrent_queue_base_v3< T >::page * array[concurrent_queue_rep< T >::n_queue]
concurrent_queue_iterator_rep(const concurrent_queue_base_v3< T > &queue)
bool get_item(T *&item, size_t k)
Set item to point to kth element. Return true if at end of queue or item is marked valid; false other...
Constness-independent portion of concurrent_queue_iterator.
void assign(const concurrent_queue_iterator_base_v3< Value > &other)
Assignment.
concurrent_queue_iterator_base_v3 & operator=(const concurrent_queue_iterator_base_v3 &i)
concurrent_queue_iterator_base_v3(const concurrent_queue_iterator_base_v3 &i)
Copy constructor.
concurrent_queue_iterator_rep< Value > * my_rep
Represents concurrent_queue over which we are iterating.
friend bool operator!=(const concurrent_queue_iterator< C, T > &i, const concurrent_queue_iterator< C, U > &j)
friend bool operator==(const concurrent_queue_iterator< C, T > &i, const concurrent_queue_iterator< C, U > &j)
void advance()
Advance iterator one step towards tail of queue.
concurrent_queue_iterator_base_v3(const concurrent_queue_base_v3< Value > &queue)
Construct iterator pointing to head of queue.
Meets requirements of a forward iterator for STL.
concurrent_queue_iterator(const concurrent_queue_base_v3< typename tbb_remove_cv< Value >::type > &queue)
Construct iterator pointing to head of queue.
concurrent_queue_iterator & operator++()
Advance to next item in queue.
concurrent_queue_iterator(const concurrent_queue_iterator< Container, typename Container::value_type > &other)
concurrent_queue_iterator & operator=(const concurrent_queue_iterator< Container, typename Container::value_type > &other)
Iterator assignment.
Value & operator*() const
Reference to current item.
Meets requirements of a forward iterator for STL.
concurrent_queue_iterator(const concurrent_queue_iterator< Container, typename Container::value_type > &other)
concurrent_queue_iterator & operator++()
Advance to next item in queue.
Value & operator*() const
Reference to current item.
concurrent_queue_iterator & operator=(const concurrent_queue_iterator< Container, typename Container::value_type > &other)
Iterator assignment.
concurrent_queue_iterator(const concurrent_queue_base_v3 &queue)
Construct iterator pointing to head of queue.
void __TBB_EXPORTED_METHOD internal_pop(void *dst)
Dequeue item from head of queue.
void __TBB_EXPORTED_METHOD internal_abort()
Abort all pending queue operations.
virtual void copy_page_item(page &dst, size_t dindex, const page &src, size_t sindex)=0
virtual void assign_and_destroy_item(void *dst, page &src, size_t index)=0
virtual __TBB_EXPORTED_METHOD ~concurrent_queue_base_v3()
void __TBB_EXPORTED_METHOD assign(const concurrent_queue_base_v3 &src)
copy internal representation
concurrent_queue_rep * my_rep
Internal representation.
virtual void copy_item(page &dst, size_t index, const void *src)=0
void __TBB_EXPORTED_METHOD internal_set_capacity(ptrdiff_t capacity, size_t element_size)
Set the queue capacity.
void __TBB_EXPORTED_METHOD internal_push(const void *src)
Enqueue item at tail of queue using copy operation.
bool __TBB_EXPORTED_METHOD internal_push_if_not_full(const void *src)
Attempt to enqueue item onto queue using copy operation.
void __TBB_EXPORTED_METHOD internal_throw_exception() const
throw an exception
void internal_assign(const concurrent_queue_base_v3 &src, copy_specifics op_type)
Assigns one queue to another using specified operation (copy or move)
void internal_insert_item(const void *src, copy_specifics op_type)
Enqueues item at tail of queue using specified operation (copy or move)
bool __TBB_EXPORTED_METHOD internal_pop_if_present(void *dst)
Attempt to dequeue item from queue.
__TBB_EXPORTED_METHOD concurrent_queue_base_v3(size_t item_size)
ptrdiff_t my_capacity
Capacity of the queue.
virtual void deallocate_page(page *p)=0
custom de-allocator
virtual page * allocate_page()=0
custom allocator
bool internal_insert_if_not_full(const void *src, copy_specifics op_type)
Attempts to enqueue at tail of queue using specified operation (copy or move)
void internal_swap(concurrent_queue_base_v3 &src)
swap queues
void __TBB_EXPORTED_METHOD internal_finish_clear()
free any remaining pages
ptrdiff_t __TBB_EXPORTED_METHOD internal_size() const
Get size of queue.
bool __TBB_EXPORTED_METHOD internal_empty() const
Check if the queue is empty.
padded_page()
Not defined anywhere - exists to quiet warnings.
void operator=(const padded_page &)
Not defined anywhere - exists to quiet warnings.
virtual void move_item(page &dst, size_t index, const void *src)=0
bool __TBB_EXPORTED_METHOD internal_push_move_if_not_full(const void *src)
Attempt to enqueue item onto queue using move operation.
void __TBB_EXPORTED_METHOD internal_push_move(const void *src)
Enqueue item at tail of queue using move operation.
virtual void move_page_item(page &dst, size_t dindex, const page &src, size_t sindex)=0
void __TBB_EXPORTED_METHOD move_content(concurrent_queue_base_v8 &src)
move items
Type-independent portion of concurrent_queue_iterator.
void initialize(const concurrent_queue_base_v3 &queue, size_t offset_of_data)
concurrent_queue_iterator_rep * my_rep
concurrent_queue over which we are iterating.
friend bool operator!=(const concurrent_queue_iterator< C, T > &i, const concurrent_queue_iterator< C, U > &j)
friend bool operator==(const concurrent_queue_iterator< C, T > &i, const concurrent_queue_iterator< C, U > &j)
void __TBB_EXPORTED_METHOD assign(const concurrent_queue_iterator_base_v3 &i)
Assignment.
void __TBB_EXPORTED_METHOD advance()
Advance iterator one step towards tail of queue.
__TBB_EXPORTED_METHOD ~concurrent_queue_iterator_base_v3()
Destructor.
concurrent_queue_iterator_base_v3(const concurrent_queue_iterator_base_v3 &i)
Copy constructor.
concurrent_queue_iterator_base_v3 & operator=(const concurrent_queue_iterator_base_v3 &i)
A lock that occupies a single byte.
Definition: spin_mutex.h:39
Represents acquisition of a mutex.
Definition: spin_mutex.h:53
Class that implements exponential backoff.
Definition: tbb_machine.h:345
void pause()
Pause for a while.
Definition: tbb_machine.h:360
Base class for types that should not be assigned.
Definition: tbb_stddef.h:322
Base class for types that should not be copied or assigned.
Definition: tbb_stddef.h:330
A queue using simple locking.
Internal representation of a ConcurrentQueue.

Copyright © 2005-2020 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.