Class: Waffle::Transports::Rabbitmq
- Inherits:
-
Object
- Object
- Waffle::Transports::Rabbitmq
- Defined in:
- lib/waffle/transports/rabbitmq.rb
Constant Summary collapse
- CONNECTION_ATTEMPT_TIMEOUT =
30
- EXCHANGE =
'events'
- @@last_connection_attempt =
Time.now
Instance Method Summary collapse
- #encoder ⇒ Object
-
#initialize ⇒ Rabbitmq
constructor
A new instance of Rabbitmq.
- #publish(flow = 'events', message = '') ⇒ Object
- #subscribe(flow = 'events') ⇒ Object
Constructor Details
Instance Method Details
#encoder ⇒ Object
19 20 21 |
# File 'lib/waffle/transports/rabbitmq.rb', line 19 def encoder @encoder ||= eval("Waffle::Encoders::#{Waffle::Config.encoder.capitalize}") end |
#publish(flow = 'events', message = '') ⇒ Object
23 24 25 26 27 28 29 30 31 32 |
# File 'lib/waffle/transports/rabbitmq.rb', line 23 def publish(flow = 'events', = '') begin @exchange = @bunny.exchange EXCHANGE @exchange.publish , :key => flow rescue if (Time.now - @@last_connection_attempt) > CONNECTION_ATTEMPT_TIMEOUT connect end end end |
#subscribe(flow = 'events') ⇒ Object
34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/waffle/transports/rabbitmq.rb', line 34 def subscribe(flow = 'events') @exchange = @bunny.exchange EXCHANGE @queue = @bunny.queue '', :durable => true, :auto_delete => true if flow.is_a? Array flow.each{ |f| @queue.bind @exchange, :key => f } else @queue.bind @exchange, :key => flow end @queue.subscribe do || yield [:delivery_details][:routing_key], encoder.decode([:payload]) end end |