Class: FnordMetric::AMQPAcceptor
- Inherits:
-
Object
- Object
- FnordMetric::AMQPAcceptor
- Defined in:
- lib/fnordmetric/acceptors/amqp_acceptor.rb
Class Method Summary collapse
Instance Method Summary collapse
- #api ⇒ Object
- #events ⇒ Object
-
#initialize(opts) ⇒ AMQPAcceptor
constructor
A new instance of AMQPAcceptor.
- #push_next_event ⇒ Object
Constructor Details
#initialize(opts) ⇒ AMQPAcceptor
Returns a new instance of AMQPAcceptor.
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/fnordmetric/acceptors/amqp_acceptor.rb', line 14 def initialize(opts) amqp = AMQP.connect(:host => 'firehose') amqp_channel = AMQP::Channel.new(amqp) msg_handler = lambda do |channel, data| event = begin JSON.parse(data) rescue FnordMetric.log("[AMQP] received invalid JSON: #{data[0..60]}") end if event event["_type"] ||= channel events << event push_next_event end end opts[:channels].each do |channel| queue = amqp_channel.queue(channel, :auto_delete => true) queue.subscribe{ |data| msg_handler[channel, data] } end end |
Class Method Details
.outbound? ⇒ Boolean
52 53 54 |
# File 'lib/fnordmetric/acceptors/amqp_acceptor.rb', line 52 def self.outbound? true end |
.start(opts) ⇒ Object
3 4 5 6 7 8 9 10 11 12 |
# File 'lib/fnordmetric/acceptors/amqp_acceptor.rb', line 3 def self.start(opts) begin require "amqp" rescue LoadError FnordMetric.error("require 'amqp' failed, you need the amqp gem") exit 1 end new(opts) end |
Instance Method Details
#api ⇒ Object
48 49 50 |
# File 'lib/fnordmetric/acceptors/amqp_acceptor.rb', line 48 def api @api ||= FnordMetric::API.new(FnordMetric.) end |
#events ⇒ Object
44 45 46 |
# File 'lib/fnordmetric/acceptors/amqp_acceptor.rb', line 44 def events @events ||= [] end |
#push_next_event ⇒ Object
38 39 40 41 42 |
# File 'lib/fnordmetric/acceptors/amqp_acceptor.rb', line 38 def push_next_event return true if events.empty? api.event(@events.pop) EM.next_tick(&method(:push_next_event)) end |