Class: Fyrehose::Reactor
- Inherits:
-
EventMachine::Connection
- Object
- EventMachine::Connection
- Fyrehose::Reactor
- Defined in:
- lib/fyrehose/reactor.rb
Class Method Summary collapse
Instance Method Summary collapse
- #deliver(channel, data) ⇒ Object
- #on_message(&block) ⇒ Object
- #post_init ⇒ Object
- #receive_data(chunk) ⇒ Object
- #set_flags(channel, flags) ⇒ Object
- #subscribe(channel) ⇒ Object
- #unsubscribe(channel) ⇒ Object
Class Method Details
.run(host, port, opts = {}, &block) ⇒ Object
5 6 7 8 9 10 11 12 13 14 |
# File 'lib/fyrehose/reactor.rb', line 5 def self.run(host, port, opts = {}, &block) unless block raise Fyrehose::Error.new("missing proc{ |channel,data| } for #run") end EventMachine.run do reactor = EventMachine.connect(host, port, Fyrehose::Reactor) reactor.instance_eval(&block) end end |
Instance Method Details
#deliver(channel, data) ⇒ Object
21 22 23 24 |
# File 'lib/fyrehose/reactor.rb', line 21 def deliver(channel, data) txid = Fyrehose.next_txid send_data("##{txid} @#{channel} *#{data.size} #{data}\n") end |
#on_message(&block) ⇒ Object
39 40 41 |
# File 'lib/fyrehose/reactor.rb', line 39 def (&block) @callbacks << block end |
#post_init ⇒ Object
16 17 18 19 |
# File 'lib/fyrehose/reactor.rb', line 16 def post_init @input_stream = Fyrehose::InputStream.new @callbacks = [] end |
#receive_data(chunk) ⇒ Object
43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/fyrehose/reactor.rb', line 43 def receive_data(chunk) @input_stream << chunk @input_stream.each do |msg| next unless msg[:type] == :data @callbacks.each do |block| block.call(msg[:channel], msg[:body]) end end end |
#set_flags(channel, flags) ⇒ Object
26 27 28 29 |
# File 'lib/fyrehose/reactor.rb', line 26 def set_flags(channel, flags) txid = Fyrehose.next_txid send_data("##{txid} @#{channel} +#{flags}\n") end |
#subscribe(channel) ⇒ Object
31 32 33 |
# File 'lib/fyrehose/reactor.rb', line 31 def subscribe(channel) set_flags(channel, 1) end |
#unsubscribe(channel) ⇒ Object
35 36 37 |
# File 'lib/fyrehose/reactor.rb', line 35 def unsubscribe(channel) set_flags(channel, 0) end |