Intel(R) Threading Building Blocks Doxygen Documentation version 4.2.3
concurrent_queue.cpp
Go to the documentation of this file.
1/*
2 Copyright (c) 2005-2020 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
17#include "tbb/tbb_stddef.h"
18#include "tbb/tbb_machine.h"
19#include "tbb/tbb_exception.h"
20// Define required to satisfy test in internal file.
21#define __TBB_concurrent_queue_H
23#include "concurrent_monitor.h"
24#include "itt_notify.h"
25#include <new>
26#include <cstring> // for memset()
27
28#if defined(_MSC_VER) && defined(_Wp64)
29 // Workaround for overzealous compiler warnings in /Wp64 mode
30 #pragma warning (disable: 4267)
31#endif
32
33#define RECORD_EVENTS 0
34
35
36namespace tbb {
37
38namespace internal {
39
41
42typedef size_t ticket;
43
45
49
51
52 atomic<page*> head_page;
53 atomic<ticket> head_counter;
54
55 atomic<page*> tail_page;
56 atomic<ticket> tail_counter;
57
59
60 void push( const void* item, ticket k, concurrent_queue_base& base,
62
64
65 bool pop( void* dst, ticket k, concurrent_queue_base& base );
66
69
70 page* make_copy ( concurrent_queue_base& base, const page* src_page, size_t begin_in_page,
71 size_t end_in_page, ticket& g_index, concurrent_queue_base::copy_specifics op_type ) ;
72
73 void make_invalid( ticket k );
74};
75
76// we need to yank it out of micro_queue because of concurrent_queue_base::deallocate_page being virtual.
83public:
85 my_ticket(k), my_queue(queue), my_page(p), base(b)
86 {}
88 page* p = my_page;
89 if( p ) {
91 page* q = p->next;
93 if( !q ) {
94 my_queue.tail_page = NULL;
95 }
96 }
98 if( p )
100 }
101};
102
105 predicate_leq( ticket t_ ) : t(t_) {}
106 bool operator() ( uintptr_t p ) const {return (ticket)p<=t;}
107};
108
110
113public:
114private:
115 friend struct micro_queue;
116
118 static const size_t phi = 3;
119
120public:
122 static const size_t n_queue = 8;
123
125 static size_t index( ticket k ) {
126 return k*phi%n_queue;
127 }
128
129 atomic<ticket> head_counter;
131 atomic<size_t> n_invalid_entries;
132 char pad1[NFS_MaxLineSize-((sizeof(atomic<ticket>)+sizeof(concurrent_monitor)+sizeof(atomic<size_t>))&(NFS_MaxLineSize-1))];
133
134 atomic<ticket> tail_counter;
136 char pad2[NFS_MaxLineSize-((sizeof(atomic<ticket>)+sizeof(concurrent_monitor))&(NFS_MaxLineSize-1))];
138
140 // The formula here approximates LRU in a cache-oblivious way.
141 return array[index(k)];
142 }
143
144 atomic<unsigned> abort_counter;
145
147 static const ptrdiff_t infinite_capacity = ptrdiff_t(~size_t(0)/2);
148};
149
150#if _MSC_VER && !defined(__INTEL_COMPILER)
151 // unary minus operator applied to unsigned type, result still unsigned
152 #pragma warning( push )
153 #pragma warning( disable: 4146 )
154#endif
155
157
158//------------------------------------------------------------------------
159// micro_queue
160//------------------------------------------------------------------------
161void micro_queue::push( const void* item, ticket k, concurrent_queue_base& base,
164 page* p = NULL;
165 // find index on page where we would put the data
167 if( !index ) { // make a new page
168 __TBB_TRY {
169 p = base.allocate_page();
170 } __TBB_CATCH(...) {
172 make_invalid( k );
174 }
175 p->mask = 0;
176 p->next = NULL;
177 }
178
179 // wait for my turn
180 if( tail_counter!=k ) // The developer insisted on keeping first check out of the backoff loop
181 for( atomic_backoff b(true);;b.pause() ) {
183 if( tail==k ) break;
184 else if( tail&0x1 ) {
185 // no memory. throws an exception; assumes concurrent_queue_rep::n_queue>1
188 }
189 }
190
191 if( p ) { // page is newly allocated; insert in micro_queue
193 if( page* q = tail_page )
194 q->next = p;
195 else
196 head_page = p;
197 tail_page = p;
198 }
199
200 if (item) {
201 p = tail_page;
202 ITT_NOTIFY( sync_acquired, p );
203 __TBB_TRY {
204 if( concurrent_queue_base::copy == op_type ) {
205 base.copy_item( *p, index, item );
206 } else {
207 __TBB_ASSERT( concurrent_queue_base::move == op_type, NULL );
208 static_cast<concurrent_queue_base_v8&>(base).move_item( *p, index, item );
209 }
210 } __TBB_CATCH(...) {
214 }
216 // If no exception was thrown, mark item as present.
217 p->mask |= uintptr_t(1)<<index;
218 }
219 else // no item; this was called from abort_push
221
223}
224
225
227 push(NULL, k, base, concurrent_queue_base::copy);
228}
229
230bool micro_queue::pop( void* dst, ticket k, concurrent_queue_base& base ) {
234 page *p = head_page;
235 __TBB_ASSERT( p, NULL );
237 bool success = false;
238 {
239 micro_queue_pop_finalizer finalizer( *this, base, k+concurrent_queue_rep::n_queue, index==base.items_per_page-1 ? p : NULL );
240 if( p->mask & uintptr_t(1)<<index ) {
241 success = true;
242 ITT_NOTIFY( sync_acquired, dst );
243 ITT_NOTIFY( sync_acquired, head_page );
244 base.assign_and_destroy_item( dst, *p, index );
246 } else {
248 }
249 }
250 return success;
251}
252
255{
258
259 const page* srcp = src.head_page;
260 if( srcp ) {
261 ticket g_index = head_counter;
262 __TBB_TRY {
265 size_t end_in_first_page = (index+n_items<base.items_per_page)?(index+n_items):base.items_per_page;
266
267 head_page = make_copy( base, srcp, index, end_in_first_page, g_index, op_type );
268 page* cur_page = head_page;
269
270 if( srcp != src.tail_page ) {
271 for( srcp = srcp->next; srcp!=src.tail_page; srcp=srcp->next ) {
272 cur_page->next = make_copy( base, srcp, 0, base.items_per_page, g_index, op_type );
273 cur_page = cur_page->next;
274 }
275
276 __TBB_ASSERT( srcp==src.tail_page, NULL );
277
279 if( last_index==0 ) last_index = base.items_per_page;
280
281 cur_page->next = make_copy( base, srcp, 0, last_index, g_index, op_type );
282 cur_page = cur_page->next;
283 }
284 tail_page = cur_page;
285 } __TBB_CATCH(...) {
286 make_invalid( g_index );
288 }
289 } else {
290 head_page = tail_page = NULL;
291 }
292 return *this;
293}
294
296 const concurrent_queue_base::page* src_page, size_t begin_in_page, size_t end_in_page,
298{
299 page* new_page = base.allocate_page();
300 new_page->next = NULL;
301 new_page->mask = src_page->mask;
302 for( ; begin_in_page!=end_in_page; ++begin_in_page, ++g_index )
303 if( new_page->mask & uintptr_t(1)<<begin_in_page ) {
304 if( concurrent_queue_base::copy == op_type ) {
305 base.copy_page_item( *new_page, begin_in_page, *src_page, begin_in_page );
306 } else {
307 __TBB_ASSERT( concurrent_queue_base::move == op_type, NULL );
308 static_cast<concurrent_queue_base_v8&>(base).move_page_item( *new_page, begin_in_page, *src_page, begin_in_page );
309 }
310 }
311 return new_page;
312}
313
315{
316 static concurrent_queue_base::page dummy = {static_cast<page*>((void*)1), 0};
317 // mark it so that no more pushes are allowed.
318 static_invalid_page = &dummy;
319 {
322 if( page* q = tail_page )
323 q->next = static_cast<page*>(static_invalid_page);
324 else
325 head_page = static_cast<page*>(static_invalid_page);
326 tail_page = static_cast<page*>(static_invalid_page);
327 }
328}
329
330#if _MSC_VER && !defined(__INTEL_COMPILER)
331 #pragma warning( pop )
332#endif // warning 4146 is back
333
334//------------------------------------------------------------------------
335// concurrent_queue_base
336//------------------------------------------------------------------------
338 items_per_page = item_sz<= 8 ? 32 :
339 item_sz<= 16 ? 16 :
340 item_sz<= 32 ? 8 :
341 item_sz<= 64 ? 4 :
342 item_sz<=128 ? 2 :
343 1;
344 my_capacity = size_t(-1)/(item_sz>1 ? item_sz : 2);
346 __TBB_ASSERT( is_aligned(my_rep, NFS_GetLineSize()), "alignment error" );
347 __TBB_ASSERT( is_aligned(&my_rep->head_counter, NFS_GetLineSize()), "alignment error" );
348 __TBB_ASSERT( is_aligned(&my_rep->tail_counter, NFS_GetLineSize()), "alignment error" );
349 __TBB_ASSERT( is_aligned(&my_rep->array, NFS_GetLineSize()), "alignment error" );
350 std::memset(static_cast<void*>(my_rep),0,sizeof(concurrent_queue_rep));
353 this->item_size = item_sz;
354}
355
357 size_t nq = my_rep->n_queue;
358 for( size_t i=0; i<nq; i++ )
359 __TBB_ASSERT( my_rep->array[i].tail_page==NULL, "pages were not freed properly" );
361}
362
365}
366
369}
370
373 unsigned old_abort_counter = r.abort_counter;
374 ticket k = r.tail_counter++;
375 ptrdiff_t e = my_capacity;
376#if DO_ITT_NOTIFY
377 bool sync_prepare_done = false;
378#endif
379 if( (ptrdiff_t)(k-r.head_counter)>=e ) { // queue is full
380#if DO_ITT_NOTIFY
381 if( !sync_prepare_done ) {
382 ITT_NOTIFY( sync_prepare, &sync_prepare_done );
383 sync_prepare_done = true;
384 }
385#endif
386 bool slept = false;
388 r.slots_avail.prepare_wait( thr_ctx, ((ptrdiff_t)(k-e)) );
389 while( (ptrdiff_t)(k-r.head_counter)>=const_cast<volatile ptrdiff_t&>(e = my_capacity) ) {
390 __TBB_TRY {
391 if( r.abort_counter!=old_abort_counter ) {
392 r.slots_avail.cancel_wait( thr_ctx );
394 }
395 slept = r.slots_avail.commit_wait( thr_ctx );
397 r.choose(k).abort_push(k, *this);
399 } __TBB_CATCH(...) {
401 }
402 if (slept == true) break;
403 r.slots_avail.prepare_wait( thr_ctx, ((ptrdiff_t)(k-e)) );
404 }
405 if( !slept )
406 r.slots_avail.cancel_wait( thr_ctx );
407 }
408 ITT_NOTIFY( sync_acquired, &sync_prepare_done );
409 __TBB_ASSERT( (ptrdiff_t)(k-r.head_counter)<my_capacity, NULL);
410 r.choose( k ).push( src, k, *this, op_type );
412}
413
416 ticket k;
417#if DO_ITT_NOTIFY
418 bool sync_prepare_done = false;
419#endif
420 unsigned old_abort_counter = r.abort_counter;
421 // This loop is a single pop operation; abort_counter should not be re-read inside
422 do {
423 k=r.head_counter++;
424 if ( (ptrdiff_t)(r.tail_counter-k)<=0 ) { // queue is empty
425#if DO_ITT_NOTIFY
426 if( !sync_prepare_done ) {
427 ITT_NOTIFY( sync_prepare, dst );
428 sync_prepare_done = true;
429 }
430#endif
431 bool slept = false;
433 r.items_avail.prepare_wait( thr_ctx, k );
434 while( (ptrdiff_t)(r.tail_counter-k)<=0 ) {
435 __TBB_TRY {
436 if( r.abort_counter!=old_abort_counter ) {
437 r.items_avail.cancel_wait( thr_ctx );
439 }
440 slept = r.items_avail.commit_wait( thr_ctx );
442 r.head_counter--;
444 } __TBB_CATCH(...) {
446 }
447 if (slept == true) break;
448 r.items_avail.prepare_wait( thr_ctx, k );
449 }
450 if( !slept )
451 r.items_avail.cancel_wait( thr_ctx );
452 }
453 __TBB_ASSERT((ptrdiff_t)(r.tail_counter-k)>0, NULL);
454 } while( !r.choose(k).pop(dst,k,*this) );
455
456 // wake up a producer..
458}
459
462 ++r.abort_counter;
465}
466
469 ticket k;
470 do {
471 k = r.head_counter;
472 for(;;) {
473 if( (ptrdiff_t)(r.tail_counter-k)<=0 ) {
474 // Queue is empty
475 return false;
476 }
477 // Queue had item with ticket k when we looked. Attempt to get that item.
478 ticket tk=k;
479 k = r.head_counter.compare_and_swap( tk+1, tk );
480 if( k==tk )
481 break;
482 // Another thread snatched the item, retry.
483 }
484 } while( !r.choose( k ).pop( dst, k, *this ) );
485
487
488 return true;
489}
490
492 return internal_insert_if_not_full( src, copy );
493}
494
496 return internal_insert_if_not_full( src, move );
497}
498
501 ticket k = r.tail_counter;
502 for(;;) {
503 if( (ptrdiff_t)(k-r.head_counter)>=my_capacity ) {
504 // Queue is full
505 return false;
506 }
507 // Queue had empty slot with ticket k when we looked. Attempt to claim that slot.
508 ticket tk=k;
509 k = r.tail_counter.compare_and_swap( tk+1, tk );
510 if( k==tk )
511 break;
512 // Another thread claimed the slot, so retry.
513 }
514 r.choose(k).push(src, k, *this, op_type);
516 return true;
517}
518
520 __TBB_ASSERT( sizeof(ptrdiff_t)<=sizeof(size_t), NULL );
522}
523
527 // if tc!=r.tail_counter, the queue was not empty at some point between the two reads.
528 return ( tc==my_rep->tail_counter && ptrdiff_t(tc-hc-my_rep->n_invalid_entries)<=0 );
529}
530
531void concurrent_queue_base_v3::internal_set_capacity( ptrdiff_t capacity, size_t /*item_sz*/ ) {
533}
534
536 size_t nq = my_rep->n_queue;
537 for( size_t i=0; i<nq; ++i ) {
538 page* tp = my_rep->array[i].tail_page;
539 __TBB_ASSERT( my_rep->array[i].head_page==tp, "at most one page should remain" );
540 if( tp!=NULL) {
541 if( tp!=static_invalid_page ) deallocate_page( tp );
542 my_rep->array[i].tail_page = NULL;
543 }
544 }
545}
546
549}
550
554
555 // copy concurrent_queue_rep.
560
561 // copy micro_queues
562 for( size_t i = 0; i<my_rep->n_queue; ++i )
563 my_rep->array[i].assign( src.my_rep->array[i], *this, op_type );
564
566 "the source concurrent queue should not be concurrently modified." );
567}
568
570 internal_assign( src, copy );
571}
572
574 internal_assign( src, move );
575}
576
577//------------------------------------------------------------------------
578// concurrent_queue_iterator_rep
579//------------------------------------------------------------------------
581public:
584 const size_t offset_of_last;
586 concurrent_queue_iterator_rep( const concurrent_queue_base& queue, size_t offset_of_last_ ) :
587 head_counter(queue.my_rep->head_counter),
588 my_queue(queue),
589 offset_of_last(offset_of_last_)
590 {
591 const concurrent_queue_rep& rep = *queue.my_rep;
592 for( size_t k=0; k<concurrent_queue_rep::n_queue; ++k )
593 array[k] = rep.array[k].head_page;
594 }
596 bool get_item( void*& item, size_t k ) {
597 if( k==my_queue.my_rep->tail_counter ) {
598 item = NULL;
599 return true;
600 } else {
602 __TBB_ASSERT(p,NULL);
604 item = static_cast<unsigned char*>(static_cast<void*>(p)) + offset_of_last + my_queue.item_size*i;
605 return (p->mask & uintptr_t(1)<<i)!=0;
606 }
607 }
608};
609
610//------------------------------------------------------------------------
611// concurrent_queue_iterator_base
612//------------------------------------------------------------------------
613
614void concurrent_queue_iterator_base_v3::initialize( const concurrent_queue_base& queue, size_t offset_of_last ) {
616 new( my_rep ) concurrent_queue_iterator_rep(queue,offset_of_last);
617 size_t k = my_rep->head_counter;
618 if( !my_rep->get_item(my_item, k) ) advance();
619}
620
622 initialize(queue,0);
623}
624
626 initialize(queue,offset_of_last);
627}
628
630 if( my_rep!=other.my_rep ) {
631 if( my_rep ) {
633 my_rep = NULL;
634 }
635 if( other.my_rep ) {
638 }
639 }
640 my_item = other.my_item;
641}
642
644 __TBB_ASSERT( my_item, "attempt to increment iterator past end of queue" );
645 size_t k = my_rep->head_counter;
646 const concurrent_queue_base& queue = my_rep->my_queue;
647#if TBB_USE_ASSERT
648 void* tmp;
649 my_rep->get_item(tmp,k);
650 __TBB_ASSERT( my_item==tmp, NULL );
651#endif /* TBB_USE_ASSERT */
653 if( i==queue.items_per_page-1 ) {
655 root = root->next;
656 }
657 // advance k
658 my_rep->head_counter = ++k;
659 if( !my_rep->get_item(my_item, k) ) advance();
660}
661
663 //delete my_rep;
665 my_rep = NULL;
666}
667
668} // namespace internal
669
670} // namespace tbb
#define __TBB_CATCH(e)
Definition: tbb_stddef.h:284
#define __TBB_TRY
Definition: tbb_stddef.h:283
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:165
#define __TBB_RETHROW()
Definition: tbb_stddef.h:286
#define ITT_NOTIFY(name, obj)
Definition: itt_notify.h:112
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void * lock
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p sync_releasing
void const char const char int ITT_FORMAT __itt_group_sync p
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle __itt_metadata_type size_t void ITT_FORMAT p const __itt_domain __itt_id __itt_string_handle const wchar_t size_t ITT_FORMAT lu const __itt_domain __itt_id __itt_relation __itt_id tail
const size_t NFS_MaxLineSize
Compile-time constant that is upper bound on cache line/sector size.
Definition: tbb_stddef.h:216
size_t __TBB_EXPORTED_FUNC NFS_GetLineSize()
Cache/sector line size.
The graph class.
static void * static_invalid_page
void throw_exception(exception_id eid)
Versionless convenience wrapper for throw_exception_v4()
void spin_wait_until_eq(const volatile T &location, const U value)
Spin UNTIL the value of the variable is equal to a given value.
Definition: tbb_machine.h:399
bool is_aligned(T *pointer, uintptr_t alignment)
A function to check if passed in pointer is aligned on a specific border.
Definition: tbb_stddef.h:370
concurrent_queue_base_v3 concurrent_queue_base
argument_integer_type modulo_power_of_two(argument_integer_type arg, divisor_integer_type divisor)
A function to compute arg modulo divisor where divisor is a power of 2.
Definition: tbb_stddef.h:382
void spin_wait_while_eq(const volatile T &location, U value)
Spin WHILE the value of the variable is equal to a given value.
Definition: tbb_machine.h:391
Meets "allocator" requirements of ISO C++ Standard, Section 20.1.5.
void deallocate(pointer p, size_type)
Free block of memory that starts on a cache line.
pointer allocate(size_type n, const void *hint=0)
Allocate space for n objects, starting on a cache/sector line.
void __TBB_EXPORTED_METHOD internal_pop(void *dst)
Dequeue item from head of queue.
void __TBB_EXPORTED_METHOD internal_abort()
Abort all pending queue operations.
virtual void copy_page_item(page &dst, size_t dindex, const page &src, size_t sindex)=0
virtual void assign_and_destroy_item(void *dst, page &src, size_t index)=0
virtual __TBB_EXPORTED_METHOD ~concurrent_queue_base_v3()
void __TBB_EXPORTED_METHOD assign(const concurrent_queue_base_v3 &src)
copy internal representation
concurrent_queue_rep * my_rep
Internal representation.
virtual void copy_item(page &dst, size_t index, const void *src)=0
void __TBB_EXPORTED_METHOD internal_set_capacity(ptrdiff_t capacity, size_t element_size)
Set the queue capacity.
void __TBB_EXPORTED_METHOD internal_push(const void *src)
Enqueue item at tail of queue using copy operation.
bool __TBB_EXPORTED_METHOD internal_push_if_not_full(const void *src)
Attempt to enqueue item onto queue using copy operation.
void __TBB_EXPORTED_METHOD internal_throw_exception() const
throw an exception
void internal_assign(const concurrent_queue_base_v3 &src, copy_specifics op_type)
Assigns one queue to another using specified operation (copy or move)
void internal_insert_item(const void *src, copy_specifics op_type)
Enqueues item at tail of queue using specified operation (copy or move)
bool __TBB_EXPORTED_METHOD internal_pop_if_present(void *dst)
Attempt to dequeue item from queue.
__TBB_EXPORTED_METHOD concurrent_queue_base_v3(size_t item_size)
ptrdiff_t my_capacity
Capacity of the queue.
virtual void deallocate_page(page *p)=0
custom de-allocator
virtual page * allocate_page()=0
custom allocator
bool internal_insert_if_not_full(const void *src, copy_specifics op_type)
Attempts to enqueue at tail of queue using specified operation (copy or move)
void __TBB_EXPORTED_METHOD internal_finish_clear()
free any remaining pages
ptrdiff_t __TBB_EXPORTED_METHOD internal_size() const
Get size of queue.
bool __TBB_EXPORTED_METHOD internal_empty() const
Check if the queue is empty.
bool __TBB_EXPORTED_METHOD internal_push_move_if_not_full(const void *src)
Attempt to enqueue item onto queue using move operation.
void __TBB_EXPORTED_METHOD internal_push_move(const void *src)
Enqueue item at tail of queue using move operation.
void __TBB_EXPORTED_METHOD move_content(concurrent_queue_base_v8 &src)
move items
Type-independent portion of concurrent_queue_iterator.
void initialize(const concurrent_queue_base_v3 &queue, size_t offset_of_data)
concurrent_queue_iterator_rep * my_rep
concurrent_queue over which we are iterating.
void __TBB_EXPORTED_METHOD assign(const concurrent_queue_iterator_base_v3 &i)
Assignment.
void __TBB_EXPORTED_METHOD advance()
Advance iterator one step towards tail of queue.
__TBB_EXPORTED_METHOD ~concurrent_queue_iterator_base_v3()
Destructor.
A lock that occupies a single byte.
Definition: spin_mutex.h:39
Represents acquisition of a mutex.
Definition: spin_mutex.h:53
Exception for user-initiated abort.
Definition: tbb_exception.h:46
Class that implements exponential backoff.
Definition: tbb_machine.h:345
void pause()
Pause for a while.
Definition: tbb_machine.h:360
Base class for types that should not be assigned.
Definition: tbb_stddef.h:322
Base class for types that should not be copied or assigned.
Definition: tbb_stddef.h:330
void notify(const P &predicate)
Notify waiting threads of the event that satisfies the given predicate.
void prepare_wait(thread_context &thr, uintptr_t ctx=0)
prepare wait by inserting 'thr' into the wait queue
void cancel_wait(thread_context &thr)
Cancel the wait. Removes the thread from the wait queue if not removed yet.
void abort_all()
Abort any sleeping threads at the time of the call.
bool commit_wait(thread_context &thr)
Commit wait if event count has not changed; otherwise, cancel wait.
A queue using simple locking.
micro_queue & assign(const micro_queue &src, concurrent_queue_base &base, concurrent_queue_base::copy_specifics op_type)
void push(const void *item, ticket k, concurrent_queue_base &base, concurrent_queue_base::copy_specifics op_type)
void abort_push(ticket k, concurrent_queue_base &base)
page * make_copy(concurrent_queue_base &base, const page *src_page, size_t begin_in_page, size_t end_in_page, ticket &g_index, concurrent_queue_base::copy_specifics op_type)
bool pop(void *dst, ticket k, concurrent_queue_base &base)
concurrent_queue_base::page page
micro_queue_pop_finalizer(micro_queue &queue, concurrent_queue_base &b, ticket k, page *p)
bool operator()(uintptr_t p) const
Internal representation of a ConcurrentQueue.
static size_t index(ticket k)
Map ticket to an array index.
char pad1[NFS_MaxLineSize-((sizeof(atomic< ticket >)+sizeof(concurrent_monitor)+sizeof(atomic< size_t >))&(NFS_MaxLineSize-1))]
char pad2[NFS_MaxLineSize-((sizeof(atomic< ticket >)+sizeof(concurrent_monitor))&(NFS_MaxLineSize-1))]
static const size_t phi
Approximately n_queue/golden ratio.
static const ptrdiff_t infinite_capacity
Value for effective_capacity that denotes unbounded queue.
static const size_t n_queue
Must be power of 2.
bool get_item(void *&item, size_t k)
Set item to point to kth element. Return true if at end of queue or item is marked valid; false other...
concurrent_queue_iterator_rep(const concurrent_queue_base &queue, size_t offset_of_last_)
concurrent_queue_base::page * array[concurrent_queue_rep::n_queue]

Copyright © 2005-2020 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.