4
4
#include "../mem/pool.h"
5
5
#include "../sched/cpu.h"
6
6
7
- #ifdef USE_VALGRIND
8
- #include <valgrind/helgrind.h>
9
- #endif
10
-
11
7
typedef struct mpmcq_node_t mpmcq_node_t ;
12
8
13
9
struct mpmcq_node_t
@@ -22,19 +18,21 @@ void ponyint_mpmcq_init(mpmcq_t* q)
22
18
atomic_store_explicit (& node -> data , NULL , memory_order_relaxed );
23
19
atomic_store_explicit (& node -> next , NULL , memory_order_relaxed );
24
20
21
+ mpmcq_dwcas_t tail ;
22
+ tail .node = node ;
23
+
25
24
atomic_store_explicit (& q -> head , node , memory_order_relaxed );
26
- atomic_store_explicit (& q -> tail , node , memory_order_relaxed );
27
- atomic_store_explicit (& q -> ticket , 0 , memory_order_relaxed );
28
- atomic_store_explicit (& q -> waiting_for , 0 , memory_order_relaxed );
25
+ atomic_store_explicit (& q -> tail , tail , memory_order_relaxed );
29
26
}
30
27
31
28
void ponyint_mpmcq_destroy (mpmcq_t * q )
32
29
{
33
- mpmcq_node_t * tail = atomic_load_explicit (& q -> tail , memory_order_relaxed );
30
+ mpmcq_dwcas_t tail = atomic_load_explicit (& q -> tail , memory_order_relaxed );
34
31
35
- POOL_FREE (mpmcq_node_t , tail );
36
- atomic_store_explicit (& q -> head , NULL , memory_order_relaxed );
37
- atomic_store_explicit (& q -> tail , NULL , memory_order_relaxed );
32
+ POOL_FREE (mpmcq_node_t , tail .node );
33
+ tail .node = NULL ;
34
+ q -> head = NULL ;
35
+ atomic_store_explicit (& q -> tail , tail , memory_order_relaxed );
38
36
}
39
37
40
38
void ponyint_mpmcq_push (mpmcq_t * q , void * data )
@@ -45,9 +43,6 @@ void ponyint_mpmcq_push(mpmcq_t* q, void* data)
45
43
46
44
mpmcq_node_t * prev = atomic_exchange_explicit (& q -> head , node ,
47
45
memory_order_relaxed );
48
- #ifdef USE_VALGRIND
49
- ANNOTATE_HAPPENS_BEFORE (& prev -> next );
50
- #endif
51
46
atomic_store_explicit (& prev -> next , node , memory_order_release );
52
47
}
53
48
@@ -60,85 +55,46 @@ void ponyint_mpmcq_push_single(mpmcq_t* q, void* data)
60
55
// If we have a single producer, the swap of the head need not be atomic RMW.
61
56
mpmcq_node_t * prev = atomic_load_explicit (& q -> head , memory_order_relaxed );
62
57
atomic_store_explicit (& q -> head , node , memory_order_relaxed );
63
- #ifdef USE_VALGRIND
64
- ANNOTATE_HAPPENS_BEFORE (& prev -> next );
65
- #endif
66
58
atomic_store_explicit (& prev -> next , node , memory_order_release );
67
59
}
68
60
69
61
void * ponyint_mpmcq_pop (mpmcq_t * q )
70
62
{
71
- size_t my_ticket = atomic_fetch_add_explicit ( & q -> ticket , 1 ,
72
- memory_order_relaxed ) ;
63
+ mpmcq_dwcas_t cmp , xchg ;
64
+ mpmcq_node_t * next ;
73
65
74
- while (my_ticket != atomic_load_explicit (& q -> waiting_for ,
75
- memory_order_relaxed ))
76
- ponyint_cpu_relax ();
66
+ cmp = atomic_load_explicit (& q -> tail , memory_order_acquire );
77
67
78
- atomic_thread_fence (memory_order_acquire );
79
- #ifdef USE_VALGRIND
80
- ANNOTATE_HAPPENS_AFTER (& q -> waiting_for );
81
- #endif
82
-
83
- mpmcq_node_t * tail = atomic_load_explicit (& q -> tail , memory_order_relaxed );
84
- // Get the next node rather than the tail. The tail is either a stub or has
85
- // already been consumed.
86
- mpmcq_node_t * next = atomic_load_explicit (& tail -> next , memory_order_relaxed );
87
-
88
- // Bailout if we have no next node.
89
- if (next == NULL )
68
+ do
90
69
{
91
- atomic_store_explicit (& q -> waiting_for , my_ticket + 1 , memory_order_relaxed );
92
- return NULL ;
93
- }
94
-
95
- atomic_store_explicit (& q -> tail , next , memory_order_relaxed );
96
- #ifdef USE_VALGRIND
97
- ANNOTATE_HAPPENS_BEFORE (& q -> waiting_for );
98
- #endif
99
- atomic_store_explicit (& q -> waiting_for , my_ticket + 1 , memory_order_release );
100
-
101
- // Synchronise-with the push.
102
- atomic_thread_fence (memory_order_acquire );
103
- #ifdef USE_VALGRIND
104
- ANNOTATE_HAPPENS_AFTER (next );
105
- #endif
106
-
70
+ // Get the next node rather than the tail. The tail is either a stub or has
71
+ // already been consumed.
72
+ next = atomic_load_explicit (& cmp .node -> next , memory_order_acquire );
73
+
74
+ // Bailout if we have no next node.
75
+ if (next == NULL )
76
+ return NULL ;
77
+
78
+ // Make the next node the tail, incrementing the aba counter. If this
79
+ // fails, cmp becomes the new tail and we retry the loop.
80
+ xchg .aba = cmp .aba + 1 ;
81
+ xchg .node = next ;
82
+ } while (!atomic_compare_exchange_weak_explicit (& q -> tail , & cmp , xchg ,
83
+ memory_order_acq_rel , memory_order_acquire ));
84
+
107
85
// We'll return the data pointer from the next node.
108
- void * data = atomic_load_explicit (& next -> data , memory_order_relaxed );
86
+ void * data = atomic_load_explicit (& next -> data , memory_order_acquire );
109
87
110
88
// Since we will be freeing the old tail, we need to be sure no other
111
89
// consumer is still reading the old tail. To do this, we set the data
112
90
// pointer of our new tail to NULL, and we wait until the data pointer of
113
91
// the old tail is NULL.
114
- #ifdef USE_VALGRIND
115
- ANNOTATE_HAPPENS_BEFORE (& next -> data );
116
- #endif
117
92
atomic_store_explicit (& next -> data , NULL , memory_order_release );
118
93
119
- while (atomic_load_explicit (& tail -> data , memory_order_relaxed ) != NULL )
94
+ while (atomic_load_explicit (& cmp . node -> data , memory_order_acquire ) != NULL )
120
95
ponyint_cpu_relax ();
121
96
122
- atomic_thread_fence (memory_order_acquire );
123
- #ifdef USE_VALGRIND
124
- ANNOTATE_HAPPENS_AFTER (& tail -> data );
125
- ANNOTATE_HAPPENS_BEFORE_FORGET_ALL (tail );
126
- #endif
127
-
128
97
// Free the old tail. The new tail is the next node.
129
- POOL_FREE (mpmcq_node_t , tail );
98
+ POOL_FREE (mpmcq_node_t , cmp . node );
130
99
return data ;
131
100
}
132
-
133
- void * ponyint_mpmcq_pop_bailout_immediate (mpmcq_t * q )
134
- {
135
- mpmcq_node_t * head = atomic_load_explicit (& q -> head , memory_order_relaxed );
136
- mpmcq_node_t * tail = atomic_load_explicit (& q -> tail , memory_order_relaxed );
137
-
138
- // If we believe the queue is empty, bailout immediately without taking a
139
- // ticket to avoid unnecessary contention.
140
- if (head == tail )
141
- return NULL ;
142
-
143
- return ponyint_mpmcq_pop (q );
144
- }
0 commit comments