Class: WhoCan::Responder

Inherits:
Object
  • Object
show all
Includes:
Deferred::Accessors, Logging
Defined in:
lib/who_can/responder.rb

Defined Under Namespace

Classes: Response

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Logging

included, #logger

Constructor Details

#initialize(connection, ping_exchange_name) ⇒ Responder

channel here is an honest-to-goodness AMQP::Channel, not a ChannelPool object



39
40
41
42
43
44
# File 'lib/who_can/responder.rb', line 39

def initialize(connection, ping_exchange_name)
  @connection = connection
  @ping_exchange_name = ping_exchange_name
  @on_ping_cb = nil
  @set_up = false
end

Instance Attribute Details

#channelObject (readonly)

Returns the value of attribute channel.



36
37
38
# File 'lib/who_can/responder.rb', line 36

def channel
  @channel
end

#connectionObject (readonly)

Returns the value of attribute connection.



36
37
38
# File 'lib/who_can/responder.rb', line 36

def connection
  @connection
end

#ping_exchangeObject (readonly)

Returns the value of attribute ping_exchange.



36
37
38
# File 'lib/who_can/responder.rb', line 36

def ping_exchange
  @ping_exchange
end

#ping_exchange_nameObject (readonly)

Returns the value of attribute ping_exchange_name.



36
37
38
# File 'lib/who_can/responder.rb', line 36

def ping_exchange_name
  @ping_exchange_name
end

#queueObject (readonly)

Returns the value of attribute queue.



36
37
38
# File 'lib/who_can/responder.rb', line 36

def queue
  @queue
end

Instance Method Details

#close!Object



113
114
115
116
117
118
119
120
# File 'lib/who_can/responder.rb', line 113

def close!
  return false if !@set_up or @close_requested
  @close_requested = true

  @queue.unsubscribe if @queue
  @queue = nil
  @set_up = false
end

#handle_message(header, payload) ⇒ Object



94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/who_can/responder.rb', line 94

def handle_message(header, payload)
  if @on_ping_cb
    resp = Response.new(header, payload)
    logger.debug {"ping request came in, sending off to get response"}
    @on_ping_cb.call(resp)

    if resp.do_not_respond
      logger.debug { "do_not_respond flag set, not sending reply" }
    else
      resp.validate!
      #logger.debug {"adding timer of: #{resp.delay} and then publishing message"}
      EM.add_timer(resp.delay) do
        logger.debug {"publishing ping response to #{header.reply_to} after delay of #{resp.delay} happened"}
        channel.default_exchange.publish(resp.response, :routing_key => header.reply_to, :message_id => header.message_id)
      end
    end
  end
end

#on_ping(&block) ⇒ Object

Register a callback that must return a Numeric (either float or integer) that will be used to delay the response to the pinger.



124
125
126
# File 'lib/who_can/responder.rb', line 124

def on_ping(&block)
  @on_ping_cb = block
end

#setup!(&blk) ⇒ Object

resp = wc.new_responder(ping_exch_name)

resp.on_ping do |obj|

obj.header  # msg headers
obj.paylod  # msg payload

obj.delay    = load_avg * num_slots * Math::PI
obj.response = "something here"

end

wc = WhoCan.new(connection_opts) wc.on_ping(“retriever”) do |resp|

end



68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/who_can/responder.rb', line 68

def setup!(&blk)
  return on_start if @set_up
  @set_up = true
  @close_requested = false
  on_start(&blk)
  AMQP::Channel.new(connection) do |channel,_|
    @channel = channel
    @ping_exchange = @channel.fanout(ping_exchange_name, :nowait => false)
    @queue = @channel.queue('', :exclusive => true, :auto_delete => true)

    unless @close_requested
      @queue.bind(@ping_exchange) do
        #logger.debug {"subscribing to the #{@ping_exchange_name}"}

        confirm_queue = lambda do |*a|
          #logger.debug { "queue subscribed to #{@ping_exchange_name}" }
          on_start.succeed
        end

        @queue.subscribe(:confirm => confirm_queue, &method(:handle_message))
      end #bind
    end # unless
  end
  on_start
end