Class: EventMachine::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/em/queue.rb

Overview

A cross thread, reactor scheduled, linear queue.

This class provides a simple “Queue” like abstraction on top of the reactor scheduler. It services two primary purposes:

  • API sugar for stateful protocols

  • Pushing processing onto the same thread as the reactor

See examples/ex_queue.rb for a detailed example.

q = EM::Queue.new
q.push('one', 'two', 'three')
3.times do
  q.pop{ |msg| puts(msg) }
end

Instance Method Summary collapse

Constructor Details

#initializeQueue

Create a new queue



19
20
21
22
# File 'lib/em/queue.rb', line 19

def initialize
  @items = []
  @popq  = []
end

Instance Method Details

#empty?Boolean

N.B. This is a peek, it’s not thread safe, and may only tend toward accuracy.

Returns:

  • (Boolean)


51
52
53
# File 'lib/em/queue.rb', line 51

def empty?
  @items.empty?
end

#pop(*a, &b) ⇒ Object

Pop items off the queue, running the block on the reactor thread. The pop will not happen immediately, but at some point in the future, either in the next tick, if the queue has data, or when the queue is populated.



27
28
29
30
31
32
33
34
35
36
37
# File 'lib/em/queue.rb', line 27

def pop(*a, &b)
  cb = EM::Callback(*a, &b)
  EM.schedule do
    if @items.empty?
      @popq << cb
    else
      cb.call @items.shift
    end
  end
  nil # Always returns nil
end

#push(*items) ⇒ Object

Push items onto the queue in the reactor thread. The items will not appear in the queue immediately, but will be scheduled for addition during the next reactor tick.



42
43
44
45
46
47
# File 'lib/em/queue.rb', line 42

def push(*items)
  EM.schedule do
    @items.push(*items)
    @popq.shift.call @items.shift until @items.empty? || @popq.empty?
  end
end

#sizeObject

N.B. This is a peek, it’s not thread safe, and may only tend toward accuracy.



57
58
59
# File 'lib/em/queue.rb', line 57

def size
  @items.size
end