1 module gfm.core.queue; 2 3 import std.range; 4 5 import core.sync.mutex, 6 core.sync.semaphore; 7 8 // what to do when capacity is exceeded? 9 private enum OverflowPolicy 10 { 11 GROW, 12 CRASH, 13 DROP 14 } 15 16 /** 17 18 Doubly-indexed queue, can be used as a FIFO or stack. 19 20 Bugs: 21 Doesn't call struct destructors. 22 */ 23 final class QueueImpl(T, OverflowPolicy overflowPolicy) 24 { 25 public 26 { 27 /// Create a QueueImpl with specified initial capacity. 28 this(size_t initialCapacity) nothrow 29 { 30 _data.length = initialCapacity; 31 clear(); 32 } 33 34 /// Returns: true if the queue is full. 35 @property bool isFull() pure const nothrow 36 { 37 return _count == capacity; 38 } 39 40 /// Returns: capacity of the queue. 41 @property size_t capacity() pure const nothrow 42 { 43 return _data.length; 44 } 45 46 /// Adds an item on the back of the queue. 47 void pushBack(T x) nothrow 48 { 49 checkOverflow!popFront(); 50 _data[(_first + _count) % _data.length] = x; 51 ++_count; 52 } 53 54 /// Adds an item on the front of the queue. 55 void pushFront(T x) nothrow 56 { 57 checkOverflow!popBack(); 58 ++_count; 59 _first = (_first - 1 + _data.length) % _data.length; 60 _data[_first] = x; 61 } 62 63 /// Removes an item from the front of the queue. 64 /// Returns: the removed item. 65 T popFront() nothrow 66 { 67 crashIfEmpty(); 68 T res = _data[_first]; 69 _first = (_first + 1) % _data.length; 70 --_count; 71 return res; 72 } 73 74 /// Removes an item from the back of the queue. 75 /// Returns: the removed item. 76 T popBack() nothrow 77 { 78 crashIfEmpty(); 79 --_count; 80 return _data[(_first + _count) % _data.length]; 81 } 82 83 /// Removes all items from the queue. 84 void clear() nothrow 85 { 86 _first = 0; 87 _count = 0; 88 } 89 90 /// Returns: number of items in the queue. 91 size_t length() pure const nothrow 92 { 93 return _count; 94 } 95 96 /// Returns: item at the front of the queue. 97 T front() pure 98 { 99 crashIfEmpty(); 100 return _data[_first]; 101 } 102 103 /// Returns: item on the back of the queue. 104 T back() pure 105 { 106 crashIfEmpty(); 107 return _data[(_first + _count + _data.length - 1) % _data.length]; 108 } 109 110 /// Returns: item index from the queue. 111 T opIndex(size_t index) 112 { 113 // crash if index out-of-bounds (not recoverable) 114 if (index > _count) 115 assert(0); 116 117 return _data[(_first + index) % _data.length]; 118 } 119 120 /// Returns: random-access range over the whole queue. 121 Range opSlice() nothrow 122 { 123 return Range(this); 124 } 125 126 /// Returns: random-access range over a queue sub-range. 127 Range opSlice(size_t i, size_t j) nothrow 128 { 129 // verify that all elements are in bound 130 if (i != j && i >= _count) 131 assert(false); 132 133 if (j > _count) 134 assert(false); 135 136 if (j < i) 137 assert(false); 138 139 return Range(this); 140 } 141 142 // range type, random access 143 static struct Range 144 { 145 nothrow: 146 public 147 { 148 this(QueueImpl queue) pure 149 { 150 this(queue, 0, queue._count); 151 _first = queue._first; 152 _count = queue._count; 153 } 154 155 this(QueueImpl queue, size_t index, size_t count) pure 156 { 157 _index = index; 158 _data = queue._data; 159 _first = (queue._first + index) % _data.length; 160 _count = _count; 161 } 162 163 @property bool empty() pure const 164 { 165 return _index >= _count; 166 } 167 168 void popFront() 169 { 170 _index++; 171 } 172 173 @property T front() pure 174 { 175 return _data[(_first + _index) % _data.length]; 176 } 177 178 void popBack() 179 { 180 _count--; 181 } 182 183 @property T back() pure 184 { 185 return _data[(_first + _count - 1) % _data.length]; 186 } 187 188 @property Range save() 189 { 190 return this; 191 } 192 193 T opIndex(size_t i) 194 { 195 // crash if index out-of-bounds of the range (not recoverable) 196 if (i > _count) 197 assert(0); 198 199 return _data[(_first + _index + i) % _data.length]; 200 } 201 202 @property size_t length() pure 203 { 204 return _count; 205 } 206 207 alias length opDollar; 208 } 209 210 private 211 { 212 size_t _index; 213 T[] _data; 214 size_t _first; 215 size_t _count; 216 } 217 } 218 } 219 220 private 221 { 222 void crashIfEmpty() 223 { 224 // popping if empty is not a recoverable error 225 if (_count == 0) 226 assert(false); 227 } 228 229 // element lie from _first to _first + _count - 1 index, modulo the allocated size 230 T[] _data; 231 size_t _first; 232 size_t _count; 233 234 void checkOverflow(alias popMethod)() nothrow 235 { 236 if (isFull()) 237 { 238 static if (overflowPolicy == OverflowPolicy.GROW) 239 extend(); 240 241 static if (overflowPolicy == OverflowPolicy.CRASH) 242 assert(false); // not recoverable to overflow such a queue 243 244 static if (overflowPolicy == OverflowPolicy.DROP) 245 popMethod(); 246 } 247 } 248 249 void extend() nothrow 250 { 251 size_t newCapacity = capacity * 2; 252 if (newCapacity < 8) 253 newCapacity = 8; 254 255 assert(newCapacity >= _count + 1); 256 257 T[] newData = new T[newCapacity]; 258 259 auto r = this[]; 260 size_t i = 0; 261 while (!r.empty()) 262 { 263 newData[i] = r.front(); 264 r.popFront(); 265 ++i; 266 } 267 _data = newData; 268 _first = 0; 269 } 270 } 271 } 272 273 static assert (isRandomAccessRange!(Queue!int.Range)); 274 275 unittest 276 { 277 // fifo 278 { 279 int N = 7; 280 auto fifo = new Queue!int(N); 281 foreach(n; 0..N) 282 fifo.pushBack(n); 283 284 assert(fifo.back() == N - 1); 285 assert(fifo.front() == 0); 286 287 foreach(n; 0..N) 288 { 289 assert(fifo.popFront() == n); 290 } 291 } 292 293 // stack 294 { 295 int N = 7; 296 auto fifo = new Queue!int(N); 297 foreach(n; 0..N) 298 fifo.pushBack(n); 299 300 foreach(n; 0..N) 301 assert(fifo.popBack() == N - 1 - n); 302 } 303 } 304 305 306 /** 307 308 A queue that can only grow. 309 310 311 See_also: $(LINK2 #QueueImpl, QueueImpl) 312 313 */ 314 template Queue(T) 315 { 316 alias QueueImpl!(T, OverflowPolicy.GROW) Queue; 317 } 318 319 /** 320 321 A fixed-sized queue that will crash on overflow. 322 323 See_also: $(LINK2 #QueueImpl, QueueImpl) 324 325 326 */ 327 template FixedSizeQueue(T) 328 { 329 alias QueueImpl!(T, OverflowPolicy.CRASH) FixedSizeQueue; 330 } 331 332 /** 333 334 Ring buffer, it drops if its capacity is exceeded. 335 336 See_also: $(LINK2 #QueueImpl, QueueImpl) 337 338 */ 339 template RingBuffer(T) 340 { 341 alias QueueImpl!(T, OverflowPolicy.DROP) RingBuffer; 342 } 343 344 /** 345 Locked queue for inter-thread communication. 346 Support multiple writers, multiple readers. 347 Blocks threads either when empty or full. 348 349 See_also: $(LINK2 #Queue, Queue) 350 */ 351 final class LockedQueue(T) 352 { 353 public 354 { 355 this(size_t capacity) 356 { 357 _queue = new Queue!T(capacity); 358 _rwMutex = new Mutex(); 359 _readerSemaphore = new Semaphore(0); 360 _writerSemaphore = new Semaphore(capacity); 361 } 362 363 /// Returns: Capacity of the locked queue. 364 size_t capacity() const 365 { 366 // no lock-required as capacity does not change 367 return _queue.capacity; 368 } 369 370 /// Push an item to the back, block if queue is full. 371 void pushBack(T x) 372 { 373 _writerSemaphore.wait(); 374 { 375 _rwMutex.lock(); 376 _queue.pushBack(x); 377 _rwMutex.unlock(); 378 } 379 _readerSemaphore.notify(); 380 } 381 382 /// Push an item to the front, block if queue is full. 383 void pushFront(T x) 384 { 385 _writerSemaphore.wait(); 386 { 387 _rwMutex.lock(); 388 _queue.pushFront(x); 389 _rwMutex.unlock(); 390 } 391 _readerSemaphore.notify(); 392 } 393 394 /// Pop an item from the front, block if queue is empty. 395 T popFront() 396 { 397 _readerSemaphore.wait(); 398 _rwMutex.lock(); 399 T res = _queue.popFront(); 400 _rwMutex.unlock(); 401 _writerSemaphore.notify(); 402 return res; 403 } 404 405 /// Pop an item from the back, block if queue is empty. 406 T popBack() 407 { 408 _readerSemaphore.wait(); 409 _rwMutex.lock(); 410 T res = _queue.popBack(); 411 _rwMutex.unlock(); 412 _writerSemaphore.notify(); 413 return res; 414 } 415 416 /// Removes all locked queue items. 417 void clear() 418 { 419 while (_readerSemaphore.tryWait()) 420 { 421 _rwMutex.lock(); 422 _queue.popBack(); 423 _rwMutex.unlock(); 424 _writerSemaphore.notify(); 425 } 426 } 427 } 428 429 private 430 { 431 Queue!T _queue; 432 Mutex _rwMutex; 433 Semaphore _readerSemaphore, _writerSemaphore; 434 } 435 }