Class: MQRPC::Agent

Inherits:
Object
  • Object
show all
Defined in:
lib/mqrpc/agent.rb

Overview

TODO: document this class

Constant Summary collapse

MAXBUF =
20
MAXMESSAGEWAIT =
MAXBUF * 20

Instance Method Summary collapse

Constructor Details

#initialize(config) ⇒ Agent

Returns a new instance of Agent.



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/mqrpc/agent.rb', line 33

def initialize(config)
  Thread::abort_on_exception = true
  @config = config
  @handler = self
  @id = UUID::generate
  @outbuffer = Hash.new { |h,k| h[k] = [] }
  @queues = []
  @topics = []
  @receive_queue = Queue.new
  @want_subscriptions = Queue.new
  @slidingwindow = Hash.new do |h,k| 
    MQRPC::logger.info "New sliding window for #{k}"
    h[k] = SizedThreadSafeHash.new(MAXMESSAGEWAIT) 
  end

  @mq = nil
  @message_operations = Hash.new

  @startup_mutex = Mutex.new
  @startup_condvar = ConditionVariable.new
  @amqp_ready = false

  start_amqp

  # Wait for our AMQP thread to get going. Mainly, it needs to set
  # @mq, so we'll block until it's available.
  @startup_mutex.synchronize do
    MQRPC::logger.debug "Waiting for @mq ..."
    @startup_condvar.wait(@startup_mutex) if !@amqp_ready
    MQRPC::logger.debug "Got it, continuing with #{self.class} init..."
  end

  start_receiver
end

Instance Method Details

#closeObject



265
266
267
# File 'lib/mqrpc/agent.rb', line 265

def close
  EM.stop_event_loop
end

#flushout(destination) ⇒ Object

handle_new_subscriptions



214
215
216
217
218
219
220
# File 'lib/mqrpc/agent.rb', line 214

def flushout(destination)
  msgs = @outbuffer[destination]
  return if msgs.length == 0
  data = msgs.to_json
  @mq.queue(destination, :durable => true).publish(data, :persistent => true)
  msgs.clear
end

#handle_message(hdr, msg_body) ⇒ Object

def subscribe_topic



121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
# File 'lib/mqrpc/agent.rb', line 121

def handle_message(hdr, msg_body)
  obj = JSON::load(msg_body)
  if !obj.is_a?(Array)
    obj = [obj]
  end

  queue = hdr.routing_key
  obj.each do |item|
    message = Message.new_from_data(item)
    slidingwindow = @slidingwindow[queue]
    if message.respond_to?(:from_queue)
      slidingwindow = @slidingwindow[message.from_queue]
    end
    MQRPC::logger.debug "#{Thread.current} Got message #{message.class}##{message.id} on queue #{queue}"
    MQRPC::logger.debug "Received message: #{message.inspect}"
    if (message.respond_to?(:in_reply_to) and 
        slidingwindow.include?(message.in_reply_to))
      MQRPC::logger.debug "Got response to #{message.in_reply_to}"
      slidingwindow.delete(message.in_reply_to)
    end
    name = message.class.name.split(":")[-1]
    func = "#{name}Handler"

    # Check if we have a specific operation looking for this
    # message.
    if (message.respond_to?(:in_reply_to) and
        @message_operations.has_key?(message.in_reply_to))
      operation = @message_operations[message.in_reply_to]
      operation.call(message)
    elsif @handler.respond_to?(func) 
      @handler.send(func, message) do |response|
        reply_destination = message.reply_to
        response.from_queue = queue
        sendmsg(reply_destination, response)
      end

      # We should allow the message handler to defer acking if they want
      # For instance, if we want to index things, but only want to ack
      # things once we actually flush to disk.
    else
      $stderr.puts "#{@handler.class.name} does not support #{func}"
    end # if @handler.respond_to?(func)
  end
  hdr.ack
end

#handle_new_subscriptionsObject



193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
# File 'lib/mqrpc/agent.rb', line 193

def handle_new_subscriptions
  todo = @want_queues - @queues
  todo.each do |queue|
    MQRPC::logger.info "Subscribing to queue #{queue}"
    mq_q = @mq.queue(queue, :durable => true)
    mq_q.subscribe(:ack => true) { |hdr, msg| @receive_queue << [hdr, msg] }
    @queues << queue
  end # todo.each

  todo = @want_topics - @topics
  todo.each do |topic|
    MQRPC::logger.info "Subscribing to topic #{topic}"
    exchange = @mq.topic(@config.mqexchange)
    mq_q = @mq.queue("#{@id}-#{topic}",
                     :exclusive => true,
                     :auto_delete => true).bind(exchange, :key => topic)
    mq_q.subscribe { |hdr, msg| @receive_queue << [hdr, msg] }
    @topics << topic
  end # todo.each
end

#handle_subscriptionsObject

run



171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
# File 'lib/mqrpc/agent.rb', line 171

def handle_subscriptions
  while true do
    type, name = @want_subscriptions.pop
    case type
    when :queue
      next if @queues.include?(name)
      MQRPC::logger.info "Subscribing to queue #{name}"
      mq_q = @mq.queue(name, :durable => true)
      mq_q.subscribe(:ack => true) { |hdr, msg| @receive_queue << [hdr, msg] }
      @queues << name
    when :topic
      MQRPC::logger.info "Subscribing to topic #{name}"
      exchange = @mq.topic(@config.mqexchange)
      mq_q = @mq.queue("#{@id}-#{name}",
                       :exclusive => true,
                       :auto_delete => true).bind(exchange, :key => name)
      mq_q.subscribe { |hdr, msg| @receive_queue << [hdr, msg] }
      @topics << name
    end
  end
end

#handler=(handler) ⇒ Object



261
262
263
# File 'lib/mqrpc/agent.rb', line 261

def handler=(handler)
  @handler = handler
end

#runObject

def handle_message



167
168
169
# File 'lib/mqrpc/agent.rb', line 167

def run
  @amqpthread.join
end

#sendmsg(destination, msg, &callback) ⇒ Object



232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
# File 'lib/mqrpc/agent.rb', line 232

def sendmsg(destination, msg, &callback)
  if (msg.is_a?(RequestMessage) and msg.id == nil)
    msg.generate_id!
  end
  msg.timestamp = Time.now.to_f
  msg.reply_to = @id

  if msg.is_a?(RequestMessage)
    MQRPC::logger.debug "Tracking #{msg.class.name}##{msg.id} to #{destination}"
    @slidingwindow[destination][msg.id] = true
  end

  if msg.buffer?
    @outbuffer[destination] << msg
    if @outbuffer[destination].length > MAXBUF
      flushout(destination)
    end
  else
    MQRPC::logger.debug "#{Thread.current} Sending to #{destination}: #{msg.inspect}"
    @mq.queue(destination, :durable => true).publish([msg].to_json, :persistent => true)
  end

  if block_given?
    op = Operation.new(callback)
    @message_operations[msg.id] = op
    return op
  end
end

#sendmsg_topic(key, msg) ⇒ Object



222
223
224
225
226
227
228
229
230
# File 'lib/mqrpc/agent.rb', line 222

def sendmsg_topic(key, msg)
  if (msg.is_a?(RequestMessage) and msg.id == nil)
    msg.generate_id!
  end
  msg.timestamp = Time.now.to_f

  data = msg.to_json
  @mq.topic(@config.mqexchange).publish(data, :key => key)
end

#start_amqpObject

def initialize



68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/mqrpc/agent.rb', line 68

def start_amqp
  @amqpthread = Thread.new do 
    # Create connection to AMQP, and in turn, the main EventMachine loop.
    amqp_config = {:host => @config.mqhost,
                   :port => @config.mqport,
                   :user => @config.mquser,
                   :pass => @config.mqpass,
                   :vhost => @config.mqvhost,
                  }
    AMQP.start(amqp_config) do
      @startup_mutex.synchronize do
        @mq = MQ.new
        # Notify the main calling thread (MessageSocket#initialize) that
        # we can continue
        @amqp_ready = true
        @startup_condvar.signal
      end

      MQRPC::logger.info "Subscribing to main queue #{@id}"
      mq_q = @mq.queue(@id, :auto_delete => true)
      mq_q.subscribe(:ack =>true) { |hdr, msg| @receive_queue << [hdr, msg] }
      #handle_new_subscriptions
      
      # TODO(sissel): make this a deferred thread that reads from a Queue
      #EM.add_periodic_timer(5) { handle_new_subscriptions }
      EM.defer { handle_subscriptions }

      EM.add_periodic_timer(1) do
        # TODO(sissel): add locking
        @outbuffer.each_key { |dest| flushout(dest) }
        @outbuffer.clear
      end
    end # AMQP.start
  end
end

#start_receiverObject

def start_amqp



104
105
106
107
108
109
110
111
# File 'lib/mqrpc/agent.rb', line 104

def start_receiver
  Thread.new do 
    while true
      header, message = @receive_queue.pop
      handle_message(header, message)
    end
  end
end

#subscribe(name) ⇒ Object

def start_receiver



113
114
115
# File 'lib/mqrpc/agent.rb', line 113

def subscribe(name)
  @want_subscriptions << [:queue, name]
end

#subscribe_topic(name) ⇒ Object

def subscribe



117
118
119
# File 'lib/mqrpc/agent.rb', line 117

def subscribe_topic(name)
  @want_subscriptions << [:topic, name]
end