-
Notifications
You must be signed in to change notification settings - Fork 0
/
LocklessMPMCQueue.h
343 lines (298 loc) · 10.3 KB
/
LocklessMPMCQueue.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
// SPDX-License-Identifier: GPL-2.0-or-later
/**
* C++14 32bit Lockless Bounded Circular MPMC Queue type.
* Author: Primrose Taylor
*/
#ifndef BOUNDED_CIRCULAR_MPMC_QUEUE_H
#define BOUNDED_CIRCULAR_MPMC_QUEUE_H
#include "stdio.h"
#include "stdlib.h"
#include <atomic>
#include <stdint.h>
#include <functional>
#include <thread>
#define CACHE_LINE_SIZE 64U
#if defined(_MSC_VER)
#define HARDWARE_PAUSE() _mm_pause();
#define _ENABLE_ATOMIC_ALIGNMENT_FIX 1 // MSVC atomic alignment fix.
#define ATOMIC_ALIGNMENT 4
#else
#define ATOMIC_ALIGNMENT 16
#if defined(__clang__) || defined(__GNUC__)
#define HARDWARE_PAUSE() __builtin_ia32_pause();
#endif
#endif
/**
* Lockless, Multi-Producer, Multi-Consumer, Bounded Circular Queue type.
* The type is intended to be light weight & portable.
* The sub-types are all padded to fit within cache lines. Padding may be put
* inbetween member variables if the variables are accessed seperatley.
*/
template <typename T, uint_least32_t queue_size, bool should_yield_not_pause = false>
class bounded_circular_mpmc_queue final
{
/**
* Simple, efficient spin-lock implementation.
* A function that takes a void lambda function can be used to
* conveiniently do something which will be protected by the lock.
* @cite Credit to Erik Rigtorp https://rigtorp.se/spinlock/
*/
class spin_lock
{
std::atomic<bool> lock_flag;
public:
spin_lock()
: lock_flag{false}
{
}
void do_work_through_lock(const std::function<void()> functor)
{
lock();
functor();
unlock();
}
void lock()
{
while (true)
{
if (!lock_flag.exchange(true, std::memory_order_acquire))
{
break;
}
while (lock_flag.load(std::memory_order_relaxed))
{
should_yield_not_pause ? std::this_thread::yield() : HARDWARE_PAUSE();
}
}
}
void unlock()
{
lock_flag.store(false, std::memory_order_release);
}
};
/**
* Structure that holds the two cursors.
* The cursors are held together because we'll only ever be accessing
* them both at the same time.
* We don't directly align the struct because we need to use it as an
* atomic variable, so we must align the atomic variable instead.
*/
struct cursor_data
{
uint_fast32_t producer_cursor;
uint_fast32_t consumer_cursor;
uint8_t padding_bytes[CACHE_LINE_SIZE -
sizeof(uint_fast32_t) -
sizeof(uint_fast32_t)
% CACHE_LINE_SIZE];
cursor_data(const uint_fast32_t in_producer_cursor = 0,
const uint_fast32_t in_consumer_cursor = 0)
: producer_cursor(in_producer_cursor),
consumer_cursor(in_consumer_cursor),
padding_bytes{0}
{
}
};
/**
* Structure that represents each node in the circular buffer.
* Access to the data is protected by a spin lock.
* Contention on the spin lock should be minimal, as it's only there
* to prevent the case where a producer/consumer may try work with an element before
* someone else has finished working with it. The data and the spin lock are seperated by
* padding to put them in differnet cache lines, since they are not accessed
* together in the case mentioned previously. The problem with this is
* that in low contention cases, they will be accessed together, and thus
* should be in the same cache line.
*/
struct buffer_node
{
T data;
uint8_t padding_bytes_0[CACHE_LINE_SIZE -
sizeof(T) % CACHE_LINE_SIZE];
spin_lock spin_lock_;
uint8_t padding_bytes_1[CACHE_LINE_SIZE -
sizeof(spin_lock)
% CACHE_LINE_SIZE];
buffer_node()
: spin_lock_(),
padding_bytes_0{0},
padding_bytes_1{0}
{
}
void get_data(T& out_data) const
{
spin_lock_.do_work_through_lock([&]()
{
out_data = data;
});
}
void set_data(const T& in_data)
{
spin_lock_.do_work_through_lock([&]()
{
data = in_data;
});
}
};
/**
* Strucutre that contains the index mask, and the circular buffer.
* Both are accessed at the same time, so they are not seperated by padding.
*/
struct alignas(CACHE_LINE_SIZE) circular_buffer_data
{
const uint_fast32_t index_mask;
buffer_node* circular_buffer;
uint8_t padding_bytes[CACHE_LINE_SIZE -
sizeof(const uint_fast32_t) -
sizeof(buffer_node*)
% CACHE_LINE_SIZE];
circular_buffer_data()
: index_mask(get_next_power_of_two()),
padding_bytes{0}
{
static_assert(queue_size > 0, "Can't have a queue size <= 0!");
static_assert(queue_size <= 0xffffffffU,
"Can't have a queue length above 32bits!");
/** Contigiously allocate the buffer.
* The theory behind using calloc and not aligned_alloc
* or equivelant, is that the memory should still be aligned,
* since calloc will align by the type size, which in this case
* is a multiple of the cache line size.
*/
circular_buffer = (buffer_node*)calloc(
index_mask + 1, sizeof(buffer_node));
}
~circular_buffer_data()
{
if(circular_buffer != nullptr)
{
free(circular_buffer);
}
}
private:
/**
* @cite https://graphics.stanford.edu/~seander/bithacks.html#RoundUpPowerOf2
*/
uint_least32_t get_next_power_of_two()
{
uint_least32_t v = queue_size;
v--;
v |= v >> 1;
v |= v >> 2;
v |= v >> 4;
v |= v >> 8;
v |= v >> 16;
v++;
return v;
}
};
public:
bounded_circular_mpmc_queue()
: cursor_data_(cursor_data{}),
circular_buffer_data_()
{
}
/**
* Push an element into the queue.
*
* @param in_data Reference to the variable containg the data to be pushed.
* @returns Returns false only if the buffer is full.
*/
bool push(const T& in_data)
{
cursor_data current_cursor_data;
// An infinite while-loop is used instead of a do-while, to avoid
// the yield/pause happening before the CAS operation.
while(true)
{
current_cursor_data = cursor_data_.load(std::memory_order_acquire);
// Check if the buffer is full..
if (current_cursor_data.producer_cursor + 1 == current_cursor_data.consumer_cursor)
{
return false;
}
// CAS operation used to make sure the cursors have not been incremented
// by another producer/consumer before we got to this point, and to then increment
// the cursor by 1 if it hasn't been changed.
if (cursor_data_.compare_exchange_weak(current_cursor_data,
{current_cursor_data.producer_cursor + 1,
current_cursor_data.consumer_cursor},
std::memory_order_release, std::memory_order_relaxed))
{
break;
}
should_yield_not_pause ? std::this_thread::yield() : HARDWARE_PAUSE();
}
// Set the data
circular_buffer_data_.circular_buffer[
current_cursor_data.producer_cursor & circular_buffer_data_.index_mask
].set_data(in_data);
return true;
}
/**
* Pop an element from the queue.
*
* @param out_data Reference to the variable that will store the popped element.
* @returns Returns false only if the buffer is empty.
*/
bool pop(T& out_data)
{
cursor_data current_cursor_data;
while(true)
{
current_cursor_data = cursor_data_.load(std::memory_order_acquire);
// empty check
if (current_cursor_data.consumer_cursor == current_cursor_data.producer_cursor)
{
return false;
}
if (cursor_data_.compare_exchange_weak(current_cursor_data,
{current_cursor_data.producer_cursor,
current_cursor_data.consumer_cursor + 1},
std::memory_order_release, std::memory_order_relaxed))
{
break;
}
should_yield_not_pause ? std::this_thread::yield() : HARDWARE_PAUSE();
}
// get the data
circular_buffer_data_.circular_buffer[
current_cursor_data.consumer_cursor & circular_buffer_data_.index_mask
].get_data(out_data);
return true;
}
/**
* @note Calling this function will increase contention on the cursor data!
* @returns How many elements are currently in the buffer.
*/
uint_fast32_t size() const
{
const cursor_data cursors = cursor_data_.load(std::memory_order_acquire);
return cursors.producer_cursor - cursors.consumer_cursor;
}
/**
* @note Calling this function will increase contention on the cursor data!
* @returns Whether or not the buffer is empty.
*/
bool empty() const
{
return size() == 0;
}
/**
* @note Calling this function will increase contention on the cursor data!
* @returns Whether or not the buffer is full.
*/
bool full() const
{
return size() == circular_buffer_data_.index_mask + 1;
}
private:
alignas(CACHE_LINE_SIZE) std::atomic<cursor_data> cursor_data_;
circular_buffer_data circular_buffer_data_;
private:
bounded_circular_mpmc_queue(
const bounded_circular_mpmc_queue&) = delete;
bounded_circular_mpmc_queue& operator=(
const bounded_circular_mpmc_queue&) = delete;
};
#endif