Class: EventMachine::Channel

Inherits:
Object
  • Object
show all
Defined in:
lib/em/channel.rb

Overview

Provides a simple interface to push items to a number of subscribers. The channel will schedule all operations on the main reactor thread for thread safe reactor operations.

This provides a convenient way for connections to consume messages from long running code in defer, without threading issues.

channel = EM::Channel.new
sid = channel.subscribe{ |msg| p [:got, msg] }
channel.push('hello world')
channel.unsubscribe(sid)

See examples/ex_channel.rb for a detailed example.

Instance Method Summary collapse

Constructor Details

#initializeChannel

Create a new channel



17
18
19
20
# File 'lib/em/channel.rb', line 17

def initialize
  @subs = {}
  @uid = 0
end

Instance Method Details

#pop(*a, &b) ⇒ Object

Receive exactly one message from the channel.



43
44
45
46
47
48
49
50
# File 'lib/em/channel.rb', line 43

def pop(*a, &b)
  EM.schedule {
    name = subscribe do |*args|
      unsubscribe(name)
      EM::Callback(*a, &b).call(*args)
    end
  }
end

#push(*items) ⇒ Object Also known as: <<

Add items to the channel, which are pushed out to all subscribers.



36
37
38
39
# File 'lib/em/channel.rb', line 36

def push(*items)
  items = items.dup
  EM.schedule { @subs.values.each { |s| items.each { |i| s.call i } } }
end

#subscribe(*a, &b) ⇒ Object

Takes any arguments suitable for EM::Callback() and returns a subscriber id for use when unsubscribing.



24
25
26
27
28
# File 'lib/em/channel.rb', line 24

def subscribe(*a, &b)
  name = gen_id
  EM.schedule { @subs[name] = EM::Callback(*a, &b) }
  name
end

#unsubscribe(name) ⇒ Object

Removes this subscriber from the list.



31
32
33
# File 'lib/em/channel.rb', line 31

def unsubscribe(name)
  EM.schedule { @subs.delete name }
end