1 module gfm.core.queue;
2 
3 import std.range;
4 
5 import core.sync.mutex,
6        core.sync.semaphore;
7 
8 // what to do when capacity is exceeded?
9 private enum OverflowPolicy
10 {
11     GROW,
12     CRASH,
13     DROP
14 }
15 
16 /**
17   
18     Doubly-indexed queue, can be used as a FIFO or stack.
19 
20     Bugs:
21         Doesn't call struct destructors.
22  */
23 final class QueueImpl(T, OverflowPolicy overflowPolicy)
24 {
25     public
26     {
27         /// Create a QueueImpl with specified initial capacity.
28         this(size_t initialCapacity) nothrow
29         {
30             _data.length = initialCapacity;
31             clear();
32         }
33 
34         /// Returns: true if the queue is full.
35         @property bool isFull() pure const nothrow
36         {
37             return _count == capacity;
38         }
39 
40         /// Returns: capacity of the queue.
41         @property size_t capacity() pure const nothrow
42         {
43             return _data.length;
44         }
45 
46         /// Adds an item on the back of the queue.
47         void pushBack(T x) nothrow
48         {
49             checkOverflow!popFront();
50            _data[(_first + _count) % _data.length] = x;
51             ++_count;
52         }
53 
54         /// Adds an item on the front of the queue.
55         void pushFront(T x) nothrow
56         {
57             checkOverflow!popBack();
58             ++_count;
59             _first = (_first - 1 + _data.length) % _data.length;
60             _data[_first] = x;
61         }
62 
63         /// Removes an item from the front of the queue.
64         /// Returns: the removed item.
65         T popFront() nothrow
66         {
67             crashIfEmpty();
68             T res = _data[_first];
69             _first = (_first + 1) % _data.length;
70             --_count;
71             return res;
72         }
73 
74         /// Removes an item from the back of the queue.
75         /// Returns: the removed item.
76         T popBack() nothrow
77         {
78             crashIfEmpty();
79             --_count;
80             return _data[(_first + _count) % _data.length];
81         }
82 
83         /// Removes all items from the queue.
84         void clear() nothrow
85         {
86             _first = 0;
87             _count = 0;
88         }
89 
90         /// Returns: number of items in the queue.
91         size_t length() pure const nothrow
92         {
93             return _count;
94         }
95 
96         /// Returns: item at the front of the queue.
97         T front() pure
98         {
99             crashIfEmpty();
100             return _data[_first];
101         }
102 
103         /// Returns: item on the back of the queue.
104         T back() pure
105         {
106             crashIfEmpty();
107             return _data[(_first + _count + _data.length - 1) % _data.length];
108         }
109 
110         /// Returns: item index from the queue.
111         T opIndex(size_t index) 
112         { 
113             // crash if index out-of-bounds (not recoverable)
114             if (index > _count)
115                 assert(0);
116 
117             return _data[(_first + index) % _data.length];
118         }
119 
120         /// Returns: random-access range over the whole queue.
121         Range opSlice() nothrow
122         {
123             return Range(this);
124         }
125 
126         /// Returns: random-access range over a queue sub-range.
127         Range opSlice(size_t i, size_t j) nothrow
128         {
129             // verify that all elements are in bound
130             if (i != j && i >= _count)
131                 assert(false);
132 
133             if (j > _count)
134                 assert(false);
135 
136             if (j < i)
137                 assert(false);
138 
139             return Range(this);
140         }
141 
142         // range type, random access
143         static struct Range
144         {
145         nothrow:
146             public
147             {
148                 this(QueueImpl queue) pure
149                 {
150                     this(queue, 0, queue._count);                    
151                     _first = queue._first;
152                     _count = queue._count;
153                 }
154 
155                 this(QueueImpl queue, size_t index, size_t count) pure
156                 {
157                     _index = index;
158                     _data = queue._data;
159                     _first = (queue._first + index) % _data.length;
160                     _count = _count;
161                 }
162 
163                 @property bool empty() pure const
164                 {
165                     return _index >= _count;
166                 }
167 
168                 void popFront()
169                 {
170                     _index++;
171                 }
172 
173                 @property T front() pure
174                 {
175                     return _data[(_first + _index) % _data.length];
176                 }
177 
178                 void popBack()
179                 {
180                     _count--;
181                 }
182 
183                 @property T back() pure
184                 {
185                     return _data[(_first + _count - 1) % _data.length];
186                 }
187 
188                 @property Range save()
189                 {
190                     return this;
191                 }
192 
193                 T opIndex(size_t i) 
194                 { 
195                     // crash if index out-of-bounds of the range (not recoverable)
196                     if (i > _count)
197                         assert(0);
198 
199                     return _data[(_first + _index + i) % _data.length];
200                 }
201 
202                 @property size_t length() pure
203                 {
204                     return _count;
205                 }
206 
207                 alias length opDollar;
208             }
209 
210             private
211             {
212                 size_t _index;
213                 T[] _data;
214                 size_t _first;
215                 size_t _count;
216             }
217         }        
218     }
219 
220     private
221     {
222         void crashIfEmpty()
223         {
224             // popping if empty is not a recoverable error
225             if (_count == 0)
226                 assert(false);
227         }
228 
229         // element lie from _first to _first + _count - 1 index, modulo the allocated size
230         T[] _data;
231         size_t _first;
232         size_t _count; 
233 
234         void checkOverflow(alias popMethod)() nothrow
235         {
236             if (isFull())
237             {
238                 static if (overflowPolicy == OverflowPolicy.GROW)
239                     extend();
240 
241                 static if (overflowPolicy == OverflowPolicy.CRASH)
242                     assert(false); // not recoverable to overflow such a queue
243 
244                 static if (overflowPolicy == OverflowPolicy.DROP)
245                     popMethod();
246             }
247         }
248       
249         void extend() nothrow
250         {
251             size_t newCapacity = capacity * 2;
252             if (newCapacity < 8)
253                 newCapacity = 8;
254 
255             assert(newCapacity >= _count + 1);
256 
257             T[] newData = new T[newCapacity];
258 
259             auto r = this[];
260             size_t i = 0;
261             while (!r.empty())
262             {
263                 newData[i] = r.front();
264                 r.popFront();
265                 ++i;
266             }
267             _data = newData;
268             _first = 0;
269         }
270     }
271 }
272 
273 static assert (isRandomAccessRange!(Queue!int.Range));
274 
275 unittest
276 {
277     // fifo
278     {
279         int N = 7;
280         auto fifo = new Queue!int(N);
281         foreach(n; 0..N)
282             fifo.pushBack(n);
283 
284         assert(fifo.back() == N - 1);
285         assert(fifo.front() == 0);
286 
287         foreach(n; 0..N)
288         {
289             assert(fifo.popFront() == n);
290         }
291     }
292 
293     // stack
294     {
295         int N = 7;
296         auto fifo = new Queue!int(N);
297         foreach(n; 0..N)
298             fifo.pushBack(n);
299 
300         foreach(n; 0..N)
301             assert(fifo.popBack() == N - 1 - n);
302     }
303 }
304 
305 
306 /**
307 
308 A queue that can only grow.
309 
310 
311 See_also: $(LINK2 #QueueImpl, QueueImpl)
312 
313 */
314 template Queue(T) 
315 { 
316     alias QueueImpl!(T, OverflowPolicy.GROW) Queue;
317 }
318 
319 /**
320 
321 A fixed-sized queue that will crash on overflow.
322 
323 See_also: $(LINK2 #QueueImpl, QueueImpl)
324 
325 
326 */
327 template FixedSizeQueue(T) 
328 { 
329     alias QueueImpl!(T, OverflowPolicy.CRASH) FixedSizeQueue;
330 }
331 
332 /**
333 
334 Ring buffer, it drops if its capacity is exceeded.
335 
336 See_also: $(LINK2 #QueueImpl, QueueImpl)
337 
338 */
339 template RingBuffer(T) 
340 { 
341     alias QueueImpl!(T, OverflowPolicy.DROP) RingBuffer;
342 }
343 
344 /**
345     Locked queue for inter-thread communication.
346     Support multiple writers, multiple readers.
347     Blocks threads either when empty or full.
348 
349     See_also: $(LINK2 #Queue, Queue)
350  */
351 final class LockedQueue(T)
352 {
353     public
354     {
355         this(size_t capacity)
356         {
357             _queue = new Queue!T(capacity);
358             _rwMutex = new Mutex();
359             _readerSemaphore = new Semaphore(0);
360             _writerSemaphore = new Semaphore(capacity);
361         }
362 
363         /// Returns: Capacity of the locked queue.
364         size_t capacity() const
365         {
366             // no lock-required as capacity does not change
367             return _queue.capacity;
368         }
369 
370         /// Push an item to the back, block if queue is full.
371         void pushBack(T x)
372         {
373             _writerSemaphore.wait();
374             {
375                 _rwMutex.lock();
376                 _queue.pushBack(x);
377                 _rwMutex.unlock();
378             }
379             _readerSemaphore.notify();
380         }
381 
382         /// Push an item to the front, block if queue is full.
383         void pushFront(T x)
384         {
385             _writerSemaphore.wait();
386             {
387                 _rwMutex.lock();
388                 _queue.pushFront(x);
389                 _rwMutex.unlock();
390             }
391             _readerSemaphore.notify();
392         }
393 
394         /// Pop an item from the front, block if queue is empty.
395         T popFront()
396         {
397             _readerSemaphore.wait();
398             _rwMutex.lock();
399             T res = _queue.popFront();
400             _rwMutex.unlock();
401             _writerSemaphore.notify();
402             return res;
403         }
404 
405         /// Pop an item from the back, block if queue is empty.
406         T popBack()
407         {
408             _readerSemaphore.wait();
409             _rwMutex.lock();
410             T res = _queue.popBack();
411             _rwMutex.unlock();
412             _writerSemaphore.notify();
413             return res;
414         }
415 
416         /// Removes all locked queue items.
417         void clear()
418         {
419             while (_readerSemaphore.tryWait())
420             {
421                 _rwMutex.lock();
422                 _queue.popBack();
423                 _rwMutex.unlock();
424                 _writerSemaphore.notify();
425             }
426         }
427     }
428 
429     private
430     {
431         Queue!T _queue;
432         Mutex _rwMutex;
433         Semaphore _readerSemaphore, _writerSemaphore;
434     }
435 }