Class: Waffle::Transports::Rabbitmq

Inherits:
Object
  • Object
show all
Defined in:
lib/waffle/transports/rabbitmq.rb

Constant Summary collapse

CONNECTION_ATTEMPT_TIMEOUT =
30
EXCHANGE =
'events'
@@last_connection_attempt =
Time.now

Instance Method Summary collapse

Constructor Details

#initializeRabbitmq

Returns a new instance of Rabbitmq.



14
15
16
17
# File 'lib/waffle/transports/rabbitmq.rb', line 14

def initialize
  @bunny = Bunny.new Waffle::Config.url
  connect
end

Instance Method Details

#encoderObject



19
20
21
# File 'lib/waffle/transports/rabbitmq.rb', line 19

def encoder
  @encoder ||= eval("Waffle::Encoders::#{Waffle::Config.encoder.capitalize}")
end

#publish(flow = 'events', message = '') ⇒ Object



23
24
25
26
27
28
29
30
31
32
# File 'lib/waffle/transports/rabbitmq.rb', line 23

def publish(flow = 'events', message = '')
  begin
    @exchange = @bunny.exchange EXCHANGE
    @exchange.publish message, :key => flow
  rescue
    if (Time.now - @@last_connection_attempt) > CONNECTION_ATTEMPT_TIMEOUT
      connect
    end
  end
end

#subscribe(flow = 'events') ⇒ Object



34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/waffle/transports/rabbitmq.rb', line 34

def subscribe(flow = 'events')
  @exchange = @bunny.exchange EXCHANGE
  @queue    = @bunny.queue '', :durable => true, :auto_delete => true

  if flow.is_a? Array
    flow.each{ |f| @queue.bind @exchange, :key => f }
  else
    @queue.bind @exchange, :key => flow
  end

  @queue.subscribe do |message|
    yield message[:delivery_details][:routing_key], encoder.decode(message[:payload])
  end
end