Class: Meerkat::RackAdapter

Inherits:
Object
  • Object
show all
Defined in:
lib/meerkat/rackadapter.rb

Defined Under Namespace

Classes: DeferrableBody

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(app = nil, &blk) ⇒ RackAdapter

Returns a new instance of RackAdapter.



9
10
11
12
13
14
# File 'lib/meerkat/rackadapter.rb', line 9

def initialize(app = nil, &blk)
  @retry = 3000
  @timeout = false
  @keep_alive = 20
  blk.call(self) if blk
end

Instance Attribute Details

#keep_aliveObject

Returns the value of attribute keep_alive.



7
8
9
# File 'lib/meerkat/rackadapter.rb', line 7

def keep_alive
  @keep_alive
end

#retryObject

Returns the value of attribute retry.



5
6
7
# File 'lib/meerkat/rackadapter.rb', line 5

def retry
  @retry
end

#timeoutObject

Returns the value of attribute timeout.



6
7
8
# File 'lib/meerkat/rackadapter.rb', line 6

def timeout
  @timeout
end

Instance Method Details

#call(env) ⇒ Object



16
17
18
19
20
21
22
23
24
25
# File 'lib/meerkat/rackadapter.rb', line 16

def call(env)
  case env['REQUEST_METHOD'] 
  when 'GET'
    listen(env)
  when 'POST'
    post(env)
  else
    [404, {}, []]
  end
end

#listen(env) ⇒ Object



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/meerkat/rackadapter.rb', line 39

def listen(env)
  body = DeferrableBody.new

  headers = {
    'Content-Type' => 'text/event-stream',
    'X-Accel-Buffering' => 'no', # http://wiki.nginx.org/X-accel#X-Accel-Buffering
  }
  EM.next_tick { env['async.callback'].call [200, headers, body] }
  EM.next_tick { body << "retry: #{@retry}\n" }
  EM.add_periodic_timer(@keep_alive) { body << ":\n" }
  EM.add_timer(@timeout) { body.succeed } if @timeout

  path_info = Rack::Utils.unescape(env["PATH_INFO"])[1..-1]
  sub = Meerkat.subscribe(path_info) do |topic, json|
    body << "event: #{topic}\n" unless path_info == topic
    body << "data: #{json}\n\n"
  end
  body.errback do
    Meerkat.unsubscribe sub
  end

  [-1, {}, []]
end

#post(env) ⇒ Object



27
28
29
30
31
32
33
34
35
36
37
# File 'lib/meerkat/rackadapter.rb', line 27

def post(env)
  req = Rack::Request.new env
  topic = req.path_info[1..-1]
  topic = req.params['topic'] if topic.empty? 
  data = req.params['data'] || req.params['msg'] || req.params['json']

  Meerkat.publish(topic, data, true)
  [204, {}, []]
rescue Exception => e
  [400, {'Content-Type' => 'text/plain'}, [e.message]]
end