Class: WhoCan::Responder
- Inherits:
-
Object
- Object
- WhoCan::Responder
- Includes:
- Deferred::Accessors, Logging
- Defined in:
- lib/who_can/responder.rb
Defined Under Namespace
Classes: Response
Instance Attribute Summary collapse
-
#channel ⇒ Object
readonly
Returns the value of attribute channel.
-
#connection ⇒ Object
readonly
Returns the value of attribute connection.
-
#ping_exchange ⇒ Object
readonly
Returns the value of attribute ping_exchange.
-
#ping_exchange_name ⇒ Object
readonly
Returns the value of attribute ping_exchange_name.
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
Instance Method Summary collapse
- #close! ⇒ Object
- #handle_message(header, payload) ⇒ Object
-
#initialize(connection, ping_exchange_name) ⇒ Responder
constructor
channel here is an honest-to-goodness AMQP::Channel, not a ChannelPool object.
-
#on_ping(&block) ⇒ Object
Register a callback that must return a Numeric (either float or integer) that will be used to delay the response to the pinger.
-
#setup!(&blk) ⇒ Object
resp = wc.new_responder(ping_exch_name).
Methods included from Logging
Constructor Details
#initialize(connection, ping_exchange_name) ⇒ Responder
channel here is an honest-to-goodness AMQP::Channel, not a ChannelPool object
39 40 41 42 43 44 |
# File 'lib/who_can/responder.rb', line 39 def initialize(connection, ping_exchange_name) @connection = connection @ping_exchange_name = ping_exchange_name @on_ping_cb = nil @set_up = false end |
Instance Attribute Details
#channel ⇒ Object (readonly)
Returns the value of attribute channel.
36 37 38 |
# File 'lib/who_can/responder.rb', line 36 def channel @channel end |
#connection ⇒ Object (readonly)
Returns the value of attribute connection.
36 37 38 |
# File 'lib/who_can/responder.rb', line 36 def connection @connection end |
#ping_exchange ⇒ Object (readonly)
Returns the value of attribute ping_exchange.
36 37 38 |
# File 'lib/who_can/responder.rb', line 36 def ping_exchange @ping_exchange end |
#ping_exchange_name ⇒ Object (readonly)
Returns the value of attribute ping_exchange_name.
36 37 38 |
# File 'lib/who_can/responder.rb', line 36 def ping_exchange_name @ping_exchange_name end |
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
36 37 38 |
# File 'lib/who_can/responder.rb', line 36 def queue @queue end |
Instance Method Details
#close! ⇒ Object
113 114 115 116 117 118 119 120 |
# File 'lib/who_can/responder.rb', line 113 def close! return false if !@set_up or @close_requested @close_requested = true @queue.unsubscribe if @queue @queue = nil @set_up = false end |
#handle_message(header, payload) ⇒ Object
94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/who_can/responder.rb', line 94 def (header, payload) if @on_ping_cb resp = Response.new(header, payload) logger.debug {"ping request came in, sending off to get response"} @on_ping_cb.call(resp) if resp.do_not_respond logger.debug { "do_not_respond flag set, not sending reply" } else resp.validate! #logger.debug {"adding timer of: #{resp.delay} and then publishing message"} EM.add_timer(resp.delay) do logger.debug {"publishing ping response to #{header.reply_to} after delay of #{resp.delay} happened"} channel.default_exchange.publish(resp.response, :routing_key => header.reply_to, :message_id => header.) end end end end |
#on_ping(&block) ⇒ Object
Register a callback that must return a Numeric (either float or integer) that will be used to delay the response to the pinger.
124 125 126 |
# File 'lib/who_can/responder.rb', line 124 def on_ping(&block) @on_ping_cb = block end |
#setup!(&blk) ⇒ Object
resp = wc.new_responder(ping_exch_name)
resp.on_ping do |obj|
obj.header # msg headers
obj.paylod # msg payload
obj.delay = load_avg * num_slots * Math::PI
obj.response = "something here"
end
wc = WhoCan.new(connection_opts) wc.on_ping(“retriever”) do |resp|
end
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 |
# File 'lib/who_can/responder.rb', line 68 def setup!(&blk) return on_start if @set_up @set_up = true @close_requested = false on_start(&blk) AMQP::Channel.new(connection) do |channel,_| @channel = channel @ping_exchange = @channel.fanout(ping_exchange_name, :nowait => false) @queue = @channel.queue('', :exclusive => true, :auto_delete => true) unless @close_requested @queue.bind(@ping_exchange) do #logger.debug {"subscribing to the #{@ping_exchange_name}"} confirm_queue = lambda do |*a| #logger.debug { "queue subscribed to #{@ping_exchange_name}" } on_start.succeed end @queue.subscribe(:confirm => confirm_queue, &method(:handle_message)) end #bind end # unless end on_start end |