1 module gfm.core.queue;
2 
3 import std.range;
4 
5 import core.sync.mutex,
6        core.sync.semaphore;
7 
8 // what to do when capacity is exceeded?
9 private enum OverflowPolicy
10 {
11     GROW,
12     CRASH,
13     DROP
14 }
15 
16 /**
17 
18     Doubly-indexed queue, can be used as a FIFO or stack.
19 
20     Bugs:
21         Doesn't call struct destructors, don't scan memory.
22         You should probably only put POD types in them.
23  */
24 final class QueueImpl(T, OverflowPolicy overflowPolicy)
25 {
26     public
27     {
28         /// Create a QueueImpl with specified initial capacity.
29         this(size_t initialCapacity) nothrow
30         {
31             _data.length = initialCapacity;
32             clear();
33         }
34 
35         /// Returns: true if the queue is full.
36         @property bool isFull() pure const nothrow
37         {
38             return _count == capacity;
39         }
40 
41         /// Returns: capacity of the queue.
42         @property size_t capacity() pure const nothrow
43         {
44             return _data.length;
45         }
46 
47         /// Adds an item on the back of the queue.
48         void pushBack(T x) nothrow
49         {
50             checkOverflow!popFront();
51            _data[(_first + _count) % _data.length] = x;
52             ++_count;
53         }
54 
55         /// Adds an item on the front of the queue.
56         void pushFront(T x) nothrow
57         {
58             checkOverflow!popBack();
59             ++_count;
60             _first = (_first - 1 + _data.length) % _data.length;
61             _data[_first] = x;
62         }
63 
64         /// Removes an item from the front of the queue.
65         /// Returns: the removed item.
66         T popFront() nothrow
67         {
68             crashIfEmpty();
69             T res = _data[_first];
70             _first = (_first + 1) % _data.length;
71             --_count;
72             return res;
73         }
74 
75         /// Removes an item from the back of the queue.
76         /// Returns: the removed item.
77         T popBack() nothrow
78         {
79             crashIfEmpty();
80             --_count;
81             return _data[(_first + _count) % _data.length];
82         }
83 
84         /// Removes all items from the queue.
85         void clear() nothrow
86         {
87             _first = 0;
88             _count = 0;
89         }
90 
91         /// Returns: number of items in the queue.
92         size_t length() pure const nothrow
93         {
94             return _count;
95         }
96 
97         /// Returns: item at the front of the queue.
98         T front() pure
99         {
100             crashIfEmpty();
101             return _data[_first];
102         }
103 
104         /// Returns: item on the back of the queue.
105         T back() pure
106         {
107             crashIfEmpty();
108             return _data[(_first + _count + _data.length - 1) % _data.length];
109         }
110 
111         /// Returns: item index from the queue.
112         T opIndex(size_t index)
113         {
114             // crash if index out-of-bounds (not recoverable)
115             if (index > _count)
116                 assert(0);
117 
118             return _data[(_first + index) % _data.length];
119         }
120 
121         /// Returns: random-access range over the whole queue.
122         Range opSlice() nothrow
123         {
124             return Range(this);
125         }
126 
127         /// Returns: random-access range over a queue sub-range.
128         Range opSlice(size_t i, size_t j) nothrow
129         {
130             // verify that all elements are in bound
131             if (i != j && i >= _count)
132                 assert(false);
133 
134             if (j > _count)
135                 assert(false);
136 
137             if (j < i)
138                 assert(false);
139 
140             return Range(this);
141         }
142 
143         // range type, random access
144         static struct Range
145         {
146         nothrow:
147             public
148             {
149                 this(QueueImpl queue) pure
150                 {
151                     this(queue, 0, queue._count);
152                     _first = queue._first;
153                     _count = queue._count;
154                 }
155 
156                 this(QueueImpl queue, size_t index, size_t count) pure
157                 {
158                     _index = index;
159                     _data = queue._data;
160                     _first = (queue._first + index) % _data.length;
161                     _count = _count;
162                 }
163 
164                 @property bool empty() pure const
165                 {
166                     return _index >= _count;
167                 }
168 
169                 void popFront()
170                 {
171                     _index++;
172                 }
173 
174                 @property T front() pure
175                 {
176                     return _data[(_first + _index) % _data.length];
177                 }
178 
179                 void popBack()
180                 {
181                     _count--;
182                 }
183 
184                 @property T back() pure
185                 {
186                     return _data[(_first + _count - 1) % _data.length];
187                 }
188 
189                 @property Range save()
190                 {
191                     return this;
192                 }
193 
194                 T opIndex(size_t i)
195                 {
196                     // crash if index out-of-bounds of the range (not recoverable)
197                     if (i > _count)
198                         assert(0);
199 
200                     return _data[(_first + _index + i) % _data.length];
201                 }
202 
203                 @property size_t length() pure
204                 {
205                     return _count;
206                 }
207 
208                 alias length opDollar;
209             }
210 
211             private
212             {
213                 size_t _index;
214                 T[] _data;
215                 size_t _first;
216                 size_t _count;
217             }
218         }
219     }
220 
221     private
222     {
223         void crashIfEmpty()
224         {
225             // popping if empty is not a recoverable error
226             if (_count == 0)
227                 assert(false);
228         }
229 
230         // element lie from _first to _first + _count - 1 index, modulo the allocated size
231         T[] _data;
232         size_t _first;
233         size_t _count;
234 
235         void checkOverflow(alias popMethod)() nothrow
236         {
237             if (isFull())
238             {
239                 static if (overflowPolicy == OverflowPolicy.GROW)
240                     extend();
241 
242                 static if (overflowPolicy == OverflowPolicy.CRASH)
243                     assert(false); // not recoverable to overflow such a queue
244 
245                 static if (overflowPolicy == OverflowPolicy.DROP)
246                     popMethod();
247             }
248         }
249 
250         void extend() nothrow
251         {
252             size_t newCapacity = capacity * 2;
253             if (newCapacity < 8)
254                 newCapacity = 8;
255 
256             assert(newCapacity >= _count + 1);
257 
258             T[] newData = new T[newCapacity];
259 
260             auto r = this[];
261             size_t i = 0;
262             while (!r.empty())
263             {
264                 newData[i] = r.front();
265                 r.popFront();
266                 ++i;
267             }
268             _data = newData;
269             _first = 0;
270         }
271     }
272 }
273 
274 static assert (isRandomAccessRange!(Queue!int.Range));
275 
276 unittest
277 {
278     // fifo
279     {
280         int N = 7;
281         auto fifo = new Queue!int(N);
282         foreach(n; 0..N)
283             fifo.pushBack(n);
284 
285         assert(fifo.back() == N - 1);
286         assert(fifo.front() == 0);
287 
288         foreach(n; 0..N)
289         {
290             assert(fifo.popFront() == n);
291         }
292     }
293 
294     // stack
295     {
296         int N = 7;
297         auto fifo = new Queue!int(N);
298         foreach(n; 0..N)
299             fifo.pushBack(n);
300 
301         foreach(n; 0..N)
302             assert(fifo.popBack() == N - 1 - n);
303     }
304 }
305 
306 
307 /**
308 
309 A queue that can only grow.
310 
311 
312 See_also: $(LINK2 #QueueImpl, QueueImpl)
313 
314 */
315 template Queue(T)
316 {
317     alias QueueImpl!(T, OverflowPolicy.GROW) Queue;
318 }
319 
320 /**
321 
322 A fixed-sized queue that will crash on overflow.
323 
324 See_also: $(LINK2 #QueueImpl, QueueImpl)
325 
326 
327 */
328 template FixedSizeQueue(T)
329 {
330     alias QueueImpl!(T, OverflowPolicy.CRASH) FixedSizeQueue;
331 }
332 
333 /**
334 
335 Ring buffer, it drops if its capacity is exceeded.
336 
337 See_also: $(LINK2 #QueueImpl, QueueImpl)
338 
339 */
340 template RingBuffer(T)
341 {
342     alias QueueImpl!(T, OverflowPolicy.DROP) RingBuffer;
343 }
344 
345 /**
346     Locked queue for inter-thread communication.
347     Support multiple writers, multiple readers.
348     Blocks threads either when empty or full.
349 
350     See_also: $(LINK2 #Queue, Queue)
351  */
352 final class LockedQueue(T)
353 {
354     public
355     {
356         /// Creates a locked queue with an initial capacity.
357         this(size_t capacity)
358         {
359             _queue = new FixedSizeQueue!T(capacity);
360             _rwMutex = new Mutex();
361             _readerSemaphore = new Semaphore(0);
362             _writerSemaphore = new Semaphore(cast(uint)capacity);
363         }
364 
365         /// Returns: Capacity of the locked queue.
366         size_t capacity() const
367         {
368             // no lock-required as capacity does not change
369             return _queue.capacity;
370         }
371 
372         /// Push an item to the back, block if queue is full.
373         void pushBack(T x)
374         {
375             _writerSemaphore.wait();
376             {
377                 _rwMutex.lock();
378                 _queue.pushBack(x);
379                 _rwMutex.unlock();
380             }
381             _readerSemaphore.notify();
382         }
383 
384         /// Push an item to the front, block if queue is full.
385         void pushFront(T x)
386         {
387             _writerSemaphore.wait();
388             {
389                 _rwMutex.lock();
390                 _queue.pushFront(x);
391                 _rwMutex.unlock();
392             }
393             _readerSemaphore.notify();
394         }
395 
396         /// Pop an item from the front, block if queue is empty.
397         T popFront()
398         {
399             _readerSemaphore.wait();
400             _rwMutex.lock();
401             T res = _queue.popFront();
402             _rwMutex.unlock();
403             _writerSemaphore.notify();
404             return res;
405         }
406 
407         /// Pop an item from the back, block if queue is empty.
408         T popBack()
409         {
410             _readerSemaphore.wait();
411             _rwMutex.lock();
412             T res = _queue.popBack();
413             _rwMutex.unlock();
414             _writerSemaphore.notify();
415             return res;
416         }
417 
418         /// Tries to pop an item from the front immediately.
419         /// Returns: true if an item was returned, false if the queue is empty.
420         bool tryPopFront(out T result)
421         {
422             if (_readerSemaphore.tryWait())
423             {
424                 _rwMutex.lock();
425                 result = _queue.popFront();
426                 _rwMutex.unlock();
427                 _writerSemaphore.notify();
428                 return true;
429             }
430             else
431                 return false;
432         }
433 
434         /// Tries to pop an item from the back immediately.
435         /// Returns: true if an item was returned, false if the queue is empty.
436         bool tryPopBack(out T result)
437         {
438             if (_readerSemaphore.tryWait())
439             {
440                 _rwMutex.lock();
441                 result = _queue.popBack();
442                 _rwMutex.unlock();
443                 _writerSemaphore.notify();
444                 return true;
445             }
446             else
447                 return false;
448         }
449 
450         /// Removes all locked queue items.
451         void clear()
452         {
453             while (_readerSemaphore.tryWait())
454             {
455                 _rwMutex.lock();
456                 _queue.popBack();
457                 _rwMutex.unlock();
458                 _writerSemaphore.notify();
459             }
460         }
461     }
462 
463     private
464     {
465         FixedSizeQueue!T _queue;
466         Mutex _rwMutex;
467         Semaphore _readerSemaphore, _writerSemaphore;
468     }
469 }
470 
471 
472 unittest
473 {
474     import std.stdio;
475     auto lq = new LockedQueue!int(3);
476     lq.clear();
477     lq.pushFront(2);
478     lq.pushBack(3);
479     lq.pushFront(1);
480 
481     // should contain [1 2 3] here
482     assert(lq.popBack() == 3);
483     assert(lq.popFront() == 1);
484     int res;
485     if (lq.tryPopFront(res))
486     {
487         assert(res == 2);
488     }
489 }