1 module gfm.core.queue; 2 3 import std.range; 4 5 import core.sync.mutex, 6 core.sync.semaphore; 7 8 // what to do when capacity is exceeded? 9 private enum OverflowPolicy 10 { 11 GROW, 12 CRASH, 13 DROP 14 } 15 16 /** 17 18 Doubly-indexed queue, can be used as a FIFO or stack. 19 20 Bugs: 21 Doesn't call struct destructors, don't scan memory. 22 You should probably only put POD types in them. 23 */ 24 final class QueueImpl(T, OverflowPolicy overflowPolicy) 25 { 26 public 27 { 28 /// Create a QueueImpl with specified initial capacity. 29 this(size_t initialCapacity) nothrow 30 { 31 _data.length = initialCapacity; 32 clear(); 33 } 34 35 /// Returns: true if the queue is full. 36 @property bool isFull() pure const nothrow 37 { 38 return _count == capacity; 39 } 40 41 /// Returns: capacity of the queue. 42 @property size_t capacity() pure const nothrow 43 { 44 return _data.length; 45 } 46 47 /// Adds an item on the back of the queue. 48 void pushBack(T x) nothrow 49 { 50 checkOverflow!popFront(); 51 _data[(_first + _count) % _data.length] = x; 52 ++_count; 53 } 54 55 /// Adds an item on the front of the queue. 56 void pushFront(T x) nothrow 57 { 58 checkOverflow!popBack(); 59 ++_count; 60 _first = (_first - 1 + _data.length) % _data.length; 61 _data[_first] = x; 62 } 63 64 /// Removes an item from the front of the queue. 65 /// Returns: the removed item. 66 T popFront() nothrow 67 { 68 crashIfEmpty(); 69 T res = _data[_first]; 70 _first = (_first + 1) % _data.length; 71 --_count; 72 return res; 73 } 74 75 /// Removes an item from the back of the queue. 76 /// Returns: the removed item. 77 T popBack() nothrow 78 { 79 crashIfEmpty(); 80 --_count; 81 return _data[(_first + _count) % _data.length]; 82 } 83 84 /// Removes all items from the queue. 85 void clear() nothrow 86 { 87 _first = 0; 88 _count = 0; 89 } 90 91 /// Returns: number of items in the queue. 92 size_t length() pure const nothrow 93 { 94 return _count; 95 } 96 97 /// Returns: item at the front of the queue. 98 T front() pure 99 { 100 crashIfEmpty(); 101 return _data[_first]; 102 } 103 104 /// Returns: item on the back of the queue. 105 T back() pure 106 { 107 crashIfEmpty(); 108 return _data[(_first + _count + _data.length - 1) % _data.length]; 109 } 110 111 /// Returns: item index from the queue. 112 T opIndex(size_t index) 113 { 114 // crash if index out-of-bounds (not recoverable) 115 if (index > _count) 116 assert(0); 117 118 return _data[(_first + index) % _data.length]; 119 } 120 121 /// Returns: random-access range over the whole queue. 122 Range opSlice() nothrow 123 { 124 return Range(this); 125 } 126 127 /// Returns: random-access range over a queue sub-range. 128 Range opSlice(size_t i, size_t j) nothrow 129 { 130 // verify that all elements are in bound 131 if (i != j && i >= _count) 132 assert(false); 133 134 if (j > _count) 135 assert(false); 136 137 if (j < i) 138 assert(false); 139 140 return Range(this); 141 } 142 143 // range type, random access 144 static struct Range 145 { 146 nothrow: 147 public 148 { 149 this(QueueImpl queue) pure 150 { 151 this(queue, 0, queue._count); 152 _first = queue._first; 153 _count = queue._count; 154 } 155 156 this(QueueImpl queue, size_t index, size_t count) pure 157 { 158 _index = index; 159 _data = queue._data; 160 _first = (queue._first + index) % _data.length; 161 _count = _count; 162 } 163 164 @property bool empty() pure const 165 { 166 return _index >= _count; 167 } 168 169 void popFront() 170 { 171 _index++; 172 } 173 174 @property T front() pure 175 { 176 return _data[(_first + _index) % _data.length]; 177 } 178 179 void popBack() 180 { 181 _count--; 182 } 183 184 @property T back() pure 185 { 186 return _data[(_first + _count - 1) % _data.length]; 187 } 188 189 @property Range save() 190 { 191 return this; 192 } 193 194 T opIndex(size_t i) 195 { 196 // crash if index out-of-bounds of the range (not recoverable) 197 if (i > _count) 198 assert(0); 199 200 return _data[(_first + _index + i) % _data.length]; 201 } 202 203 @property size_t length() pure 204 { 205 return _count; 206 } 207 208 alias length opDollar; 209 } 210 211 private 212 { 213 size_t _index; 214 T[] _data; 215 size_t _first; 216 size_t _count; 217 } 218 } 219 } 220 221 private 222 { 223 void crashIfEmpty() 224 { 225 // popping if empty is not a recoverable error 226 if (_count == 0) 227 assert(false); 228 } 229 230 // element lie from _first to _first + _count - 1 index, modulo the allocated size 231 T[] _data; 232 size_t _first; 233 size_t _count; 234 235 void checkOverflow(alias popMethod)() nothrow 236 { 237 if (isFull()) 238 { 239 static if (overflowPolicy == OverflowPolicy.GROW) 240 extend(); 241 242 static if (overflowPolicy == OverflowPolicy.CRASH) 243 assert(false); // not recoverable to overflow such a queue 244 245 static if (overflowPolicy == OverflowPolicy.DROP) 246 popMethod(); 247 } 248 } 249 250 void extend() nothrow 251 { 252 size_t newCapacity = capacity * 2; 253 if (newCapacity < 8) 254 newCapacity = 8; 255 256 assert(newCapacity >= _count + 1); 257 258 T[] newData = new T[newCapacity]; 259 260 auto r = this[]; 261 size_t i = 0; 262 while (!r.empty()) 263 { 264 newData[i] = r.front(); 265 r.popFront(); 266 ++i; 267 } 268 _data = newData; 269 _first = 0; 270 } 271 } 272 } 273 274 static assert (isRandomAccessRange!(Queue!int.Range)); 275 276 unittest 277 { 278 // fifo 279 { 280 int N = 7; 281 auto fifo = new Queue!int(N); 282 foreach(n; 0..N) 283 fifo.pushBack(n); 284 285 assert(fifo.back() == N - 1); 286 assert(fifo.front() == 0); 287 288 foreach(n; 0..N) 289 { 290 assert(fifo.popFront() == n); 291 } 292 } 293 294 // stack 295 { 296 int N = 7; 297 auto fifo = new Queue!int(N); 298 foreach(n; 0..N) 299 fifo.pushBack(n); 300 301 foreach(n; 0..N) 302 assert(fifo.popBack() == N - 1 - n); 303 } 304 } 305 306 307 /** 308 309 A queue that can only grow. 310 311 312 See_also: $(LINK2 #QueueImpl, QueueImpl) 313 314 */ 315 template Queue(T) 316 { 317 alias QueueImpl!(T, OverflowPolicy.GROW) Queue; 318 } 319 320 /** 321 322 A fixed-sized queue that will crash on overflow. 323 324 See_also: $(LINK2 #QueueImpl, QueueImpl) 325 326 327 */ 328 template FixedSizeQueue(T) 329 { 330 alias QueueImpl!(T, OverflowPolicy.CRASH) FixedSizeQueue; 331 } 332 333 /** 334 335 Ring buffer, it drops if its capacity is exceeded. 336 337 See_also: $(LINK2 #QueueImpl, QueueImpl) 338 339 */ 340 template RingBuffer(T) 341 { 342 alias QueueImpl!(T, OverflowPolicy.DROP) RingBuffer; 343 } 344 345 /** 346 Locked queue for inter-thread communication. 347 Support multiple writers, multiple readers. 348 Blocks threads either when empty or full. 349 350 See_also: $(LINK2 #Queue, Queue) 351 */ 352 final class LockedQueue(T) 353 { 354 public 355 { 356 /// Creates a locked queue with an initial capacity. 357 this(size_t capacity) 358 { 359 _queue = new FixedSizeQueue!T(capacity); 360 _rwMutex = new Mutex(); 361 _readerSemaphore = new Semaphore(0); 362 _writerSemaphore = new Semaphore(cast(uint)capacity); 363 } 364 365 /// Returns: Capacity of the locked queue. 366 size_t capacity() const 367 { 368 // no lock-required as capacity does not change 369 return _queue.capacity; 370 } 371 372 /// Push an item to the back, block if queue is full. 373 void pushBack(T x) 374 { 375 _writerSemaphore.wait(); 376 { 377 _rwMutex.lock(); 378 _queue.pushBack(x); 379 _rwMutex.unlock(); 380 } 381 _readerSemaphore.notify(); 382 } 383 384 /// Push an item to the front, block if queue is full. 385 void pushFront(T x) 386 { 387 _writerSemaphore.wait(); 388 { 389 _rwMutex.lock(); 390 _queue.pushFront(x); 391 _rwMutex.unlock(); 392 } 393 _readerSemaphore.notify(); 394 } 395 396 /// Pop an item from the front, block if queue is empty. 397 T popFront() 398 { 399 _readerSemaphore.wait(); 400 _rwMutex.lock(); 401 T res = _queue.popFront(); 402 _rwMutex.unlock(); 403 _writerSemaphore.notify(); 404 return res; 405 } 406 407 /// Pop an item from the back, block if queue is empty. 408 T popBack() 409 { 410 _readerSemaphore.wait(); 411 _rwMutex.lock(); 412 T res = _queue.popBack(); 413 _rwMutex.unlock(); 414 _writerSemaphore.notify(); 415 return res; 416 } 417 418 /// Tries to pop an item from the front immediately. 419 /// Returns: true if an item was returned, false if the queue is empty. 420 bool tryPopFront(out T result) 421 { 422 if (_readerSemaphore.tryWait()) 423 { 424 _rwMutex.lock(); 425 result = _queue.popFront(); 426 _rwMutex.unlock(); 427 _writerSemaphore.notify(); 428 return true; 429 } 430 else 431 return false; 432 } 433 434 /// Tries to pop an item from the back immediately. 435 /// Returns: true if an item was returned, false if the queue is empty. 436 bool tryPopBack(out T result) 437 { 438 if (_readerSemaphore.tryWait()) 439 { 440 _rwMutex.lock(); 441 result = _queue.popBack(); 442 _rwMutex.unlock(); 443 _writerSemaphore.notify(); 444 return true; 445 } 446 else 447 return false; 448 } 449 450 /// Removes all locked queue items. 451 void clear() 452 { 453 while (_readerSemaphore.tryWait()) 454 { 455 _rwMutex.lock(); 456 _queue.popBack(); 457 _rwMutex.unlock(); 458 _writerSemaphore.notify(); 459 } 460 } 461 } 462 463 private 464 { 465 FixedSizeQueue!T _queue; 466 Mutex _rwMutex; 467 Semaphore _readerSemaphore, _writerSemaphore; 468 } 469 } 470 471 472 unittest 473 { 474 import std.stdio; 475 auto lq = new LockedQueue!int(3); 476 lq.clear(); 477 lq.pushFront(2); 478 lq.pushBack(3); 479 lq.pushFront(1); 480 481 // should contain [1 2 3] here 482 assert(lq.popBack() == 3); 483 assert(lq.popFront() == 1); 484 int res; 485 if (lq.tryPopFront(res)) 486 { 487 assert(res == 2); 488 } 489 }