Class: Fyrehose::Reactor

Inherits:
EventMachine::Connection
  • Object
show all
Defined in:
lib/fyrehose/reactor.rb

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.run(host, port, opts = {}, &block) ⇒ Object



5
6
7
8
9
10
11
12
13
14
# File 'lib/fyrehose/reactor.rb', line 5

def self.run(host, port, opts = {}, &block)
  unless block
    raise Fyrehose::Error.new("missing proc{ |channel,data| } for #run")
  end

  EventMachine.run do
    reactor = EventMachine.connect(host, port, Fyrehose::Reactor)
    reactor.instance_eval(&block)
  end
end

Instance Method Details

#deliver(channel, data) ⇒ Object



21
22
23
24
# File 'lib/fyrehose/reactor.rb', line 21

def deliver(channel, data)
  txid = Fyrehose.next_txid
  send_data("##{txid} @#{channel} *#{data.size} #{data}\n")
end

#on_message(&block) ⇒ Object



39
40
41
# File 'lib/fyrehose/reactor.rb', line 39

def on_message(&block)
  @callbacks << block
end

#post_initObject



16
17
18
19
# File 'lib/fyrehose/reactor.rb', line 16

def post_init
  @input_stream = Fyrehose::InputStream.new
  @callbacks = []
end

#receive_data(chunk) ⇒ Object



43
44
45
46
47
48
49
50
51
52
53
# File 'lib/fyrehose/reactor.rb', line 43

def receive_data(chunk)
  @input_stream << chunk

  @input_stream.each do |msg|
    next unless msg[:type] == :data

    @callbacks.each do |block|
      block.call(msg[:channel], msg[:body])
    end
  end
end

#set_flags(channel, flags) ⇒ Object



26
27
28
29
# File 'lib/fyrehose/reactor.rb', line 26

def set_flags(channel, flags)
  txid = Fyrehose.next_txid
  send_data("##{txid} @#{channel} +#{flags}\n")
end

#subscribe(channel) ⇒ Object



31
32
33
# File 'lib/fyrehose/reactor.rb', line 31

def subscribe(channel)
  set_flags(channel, 1)
end

#unsubscribe(channel) ⇒ Object



35
36
37
# File 'lib/fyrehose/reactor.rb', line 35

def unsubscribe(channel)
  set_flags(channel, 0)
end