Class: EventMachine::Queue
- Inherits:
-
Object
- Object
- EventMachine::Queue
- Defined in:
- lib/em/queue.rb
Overview
A cross thread, reactor scheduled, linear queue.
This class provides a simple queue abstraction on top of the reactor scheduler. It services two primary purposes:
-
API sugar for stateful protocols
-
Pushing processing onto the reactor thread
Instance Method Summary collapse
- #empty? ⇒ Boolean
-
#initialize ⇒ Queue
constructor
A new instance of Queue.
-
#num_waiting ⇒ Integer
Waiting size.
-
#pop(*a, &b) ⇒ NilClass
Pop items off the queue, running the block on the reactor thread.
-
#push(*items) ⇒ Object
(also: #<<)
Push items onto the queue in the reactor thread.
-
#size ⇒ Integer
Queue size.
Constructor Details
#initialize ⇒ Queue
Returns a new instance of Queue.
19 20 21 22 23 |
# File 'lib/em/queue.rb', line 19 def initialize @sink = [] @drain = [] @popq = [] end |
Instance Method Details
#empty? ⇒ Boolean
This is a peek, it’s not thread safe, and may only tend toward accuracy.
63 64 65 |
# File 'lib/em/queue.rb', line 63 def empty? @drain.empty? && @sink.empty? end |
#num_waiting ⇒ Integer
This is a peek at the number of jobs that are currently waiting on the Queue
Returns Waiting size.
75 76 77 |
# File 'lib/em/queue.rb', line 75 def num_waiting @popq.size end |
#pop(*a, &b) ⇒ NilClass
Pop items off the queue, running the block on the reactor thread. The pop will not happen immediately, but at some point in the future, either in the next tick, if the queue has data, or when the queue is populated.
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/em/queue.rb', line 30 def pop(*a, &b) cb = EM::Callback(*a, &b) EM.schedule do if @drain.empty? @drain = @sink @sink = [] end if @drain.empty? @popq << cb else cb.call @drain.shift end end nil # Always returns nil end |
#push(*items) ⇒ Object Also known as: <<
Push items onto the queue in the reactor thread. The items will not appear in the queue immediately, but will be scheduled for addition during the next reactor tick.
49 50 51 52 53 54 55 56 57 58 |
# File 'lib/em/queue.rb', line 49 def push(*items) EM.schedule do @sink.push(*items) unless @popq.empty? @drain = @sink @sink = [] @popq.shift.call @drain.shift until @drain.empty? || @popq.empty? end end end |
#size ⇒ Integer
This is a peek, it’s not thread safe, and may only tend toward accuracy.
Returns Queue size.
69 70 71 |
# File 'lib/em/queue.rb', line 69 def size @drain.size + @sink.size end |