Class: MQRPC::Agent
- Inherits:
-
Object
- Object
- MQRPC::Agent
- Defined in:
- lib/mqrpc/agent.rb
Overview
TODO: document this class
Constant Summary collapse
- MAXBUF =
20
- MAXMESSAGEWAIT =
MAXBUF * 20
Class Attribute Summary collapse
-
.message_handlers ⇒ Object
Returns the value of attribute message_handlers.
-
.pipelines ⇒ Object
Returns the value of attribute pipelines.
Class Method Summary collapse
-
.handle(messageclass, method) ⇒ Object
Subclasses use this to declare their support of any given message.
- .pipeline(source, destination) ⇒ Object
Instance Method Summary collapse
-
#can_receive?(message_class) ⇒ Boolean
run.
- #close ⇒ Object
-
#flushout(destination) ⇒ Object
def handle_subscriptions.
-
#handle_message(hdr, msg_body) ⇒ Object
def subscribe_topic.
- #handle_subscriptions ⇒ Object
- #handler=(handler) ⇒ Object
-
#initialize(config) ⇒ Agent
constructor
A new instance of Agent.
-
#run ⇒ Object
def handle_message.
- #sendmsg(destination, msg, &callback) ⇒ Object
- #sendmsg_topic(key, msg) ⇒ Object
-
#start_amqp ⇒ Object
def initialize.
-
#start_receiver ⇒ Object
def start_amqp.
-
#subscribe(name) ⇒ Object
def start_receiver.
-
#subscribe_topic(name) ⇒ Object
def unsubscribe.
-
#unsubscribe(name) ⇒ Object
def subscribe.
Constructor Details
#initialize(config) ⇒ Agent
Returns a new instance of Agent.
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 |
# File 'lib/mqrpc/agent.rb', line 56 def initialize(config) Thread::abort_on_exception = true @config = config @handler = self @id = UUID::generate @outbuffer = Hash.new { |h,k| h[k] = [] } @queues = Set.new @topics = Set.new @receive_queue = Queue.new @want_subscriptions = Queue.new # figure out how to really do this correctly, see also def self.pipeline if self.class.pipelines == nil self.class.pipelines = Hash.new end @slidingwindow = Hash.new do |h,k| MQRPC::logger.debug "New sliding window for #{k}" h[k] = SizedThreadSafeHash.new(MAXMESSAGEWAIT) do |state| if self.class.pipelines[k] source = self.class.pipelines[k] MQRPC::logger.debug "Got sizedhash callback for #{k}: #{state}" case state when :blocked MQRPC::logger.info("Queue '#{k}' is full, unsubscribing from #{source}") unsubscribe(source) when :ready MQRPC::logger.info("Queue '#{k}' is ready, resubscribing to #{source}") subscribe(source) end end end end @mq = nil @message_operations = Hash.new @startup_mutex = Mutex.new @startup_condvar = ConditionVariable.new @amqp_ready = false start_amqp # Wait for our AMQP thread to get going. Mainly, it needs to set # @mq, so we'll block until it's available. @startup_mutex.synchronize do MQRPC::logger.debug "Waiting for @mq ..." @startup_condvar.wait(@startup_mutex) if !@amqp_ready MQRPC::logger.debug "Got it, continuing with #{self.class} init..." end start_receiver end |
Class Attribute Details
.message_handlers ⇒ Object
Returns the value of attribute message_handlers.
35 36 37 |
# File 'lib/mqrpc/agent.rb', line 35 def @message_handlers end |
.pipelines ⇒ Object
Returns the value of attribute pipelines.
36 37 38 |
# File 'lib/mqrpc/agent.rb', line 36 def pipelines @pipelines end |
Class Method Details
.handle(messageclass, method) ⇒ Object
Subclasses use this to declare their support of any given message
41 42 43 44 45 46 |
# File 'lib/mqrpc/agent.rb', line 41 def self.handle(, method) if self. == nil self. = Hash.new end self.[] = method end |
.pipeline(source, destination) ⇒ Object
48 49 50 51 52 53 54 |
# File 'lib/mqrpc/agent.rb', line 48 def self.pipeline(source, destination) if self.pipelines == nil self.pipelines = Hash.new end self.pipelines[destination] = source end |
Instance Method Details
#can_receive?(message_class) ⇒ Boolean
run
248 249 250 251 252 253 254 |
# File 'lib/mqrpc/agent.rb', line 248 def can_receive?() if self.class. == nil self.class. = [] end return self.class..include?() end |
#close ⇒ Object
355 356 357 |
# File 'lib/mqrpc/agent.rb', line 355 def close EM.stop_event_loop end |
#flushout(destination) ⇒ Object
def handle_subscriptions
303 304 305 306 307 308 309 |
# File 'lib/mqrpc/agent.rb', line 303 def flushout(destination) msgs = @outbuffer[destination] return if msgs.length == 0 data = msgs.to_json @mq.queue(destination, :durable => true).publish(data, :persistent => true) msgs.clear end |
#handle_message(hdr, msg_body) ⇒ Object
def subscribe_topic
179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 |
# File 'lib/mqrpc/agent.rb', line 179 def (hdr, msg_body) queue = hdr.routing_key # If we just unsubscribed from a queue, we may still have some # messages buffered, so reject the message. # Currently RabbitMQ doesn't support message rejection, so let's # ack the message then push it back into the queue, unmodified. if !@queues.include?(queue) MQRPC::logger.warn("Got message on queue #{queue} that we are not "\ "subscribed to; rejecting") hdr.ack @mq.queue(queue, :durable => true).publish(msg_body, :persistent => true) return end begin obj = JSON::load(msg_body) rescue JSON::ParserError MQRPC::logger.warn("Skipping non-JSON message: #{msg_body}") hdr.ack return end if !obj.is_a?(Array) obj = [obj] end obj.each do |item| = Message.new_from_data(item) slidingwindow = @slidingwindow[queue] if .respond_to?(:from_queue) slidingwindow = @slidingwindow[.from_queue] end MQRPC::logger.debug "Got message #{.class}##{.id} on queue #{queue}" #MQRPC::logger.debug "Received message: #{message.inspect}" if (.respond_to?(:in_reply_to) and slidingwindow.include?(.in_reply_to)) MQRPC::logger.debug "Got response to #{.in_reply_to}" slidingwindow.delete(.in_reply_to) end # Check if we have a specific operation looking for this # message. if (.respond_to?(:in_reply_to) and @message_operations.has_key?(.in_reply_to)) operation = @message_operations[.in_reply_to] operation.call() elsif can_receive?(.class) func = self.class.[.class] self.send(func, ) do |response| reply_destination = .reply_to response.from_queue = queue sendmsg(reply_destination, response) end # TODO(sissel): We should allow the message handler to defer acking # if they want For instance, if we want to index things, but only # want to ack things once we actually flush to disk. else $stderr.puts "#{@handler.class.name} does not support #{.class}" end end hdr.ack end |
#handle_subscriptions ⇒ Object
256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 |
# File 'lib/mqrpc/agent.rb', line 256 def handle_subscriptions Thread.current[:name] = "subscriptionhandler" while true do queuetype, name = @want_subscriptions.pop case queuetype when :queue if @queues.include?(name) MQRPC::logger.info "Ignoring subscription request to queue "\ "#{name}, already subscribed." next end MQRPC::logger.info "Subscribing to queue #{name}" # Send a dummy message to queue #{name} so there's at least # one message to receive, and wake up our Operation. sendmsg(name, DummyMessage.new) exchange = @mq.topic(@config.mqexchange, :durable => true) mq_q = @mq.queue(name, :durable => true) mq_q.bind(exchange, :key => "*") op = Operation.new mq_q.subscribe(:ack => true) do |hdr, msg| op.finished return if msg.is_a?(DummyMessage) queue = hdr.routing_key MQRPC::logger.info("received message on #{queue}") @receive_queue << [hdr, msg] MQRPC::logger.info("finished receiving message on #{queue}") MQRPC::logger.info("msg: #{msg}") MQRPC::logger.info("#{queue} queue size: #{@receive_queue.length}") end # Wait until we receive our first message (might be DummyMessage, # doesn't matter) -- this confirms we are subscribed. subscribe # is async... op.wait_until_finished @queues << name when :topic MQRPC::logger.info "Subscribing to topic #{name}" exchange = @mq.topic(@config.mqexchange, :durable => true) mq_q = @mq.queue("#{@id}-#{name}", :exclusive => true, :auto_delete => true).bind(exchange, :key => name) mq_q.subscribe { |hdr, msg| @receive_queue << [hdr, msg] } @topics << name end # case queuetype end # while true end |
#handler=(handler) ⇒ Object
351 352 353 |
# File 'lib/mqrpc/agent.rb', line 351 def handler=(handler) @handler = handler end |
#run ⇒ Object
def handle_message
243 244 245 246 |
# File 'lib/mqrpc/agent.rb', line 243 def run Thread.current[:name] ||= "#{self.class.name}#run" @amqpthread.join end |
#sendmsg(destination, msg, &callback) ⇒ Object
321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 |
# File 'lib/mqrpc/agent.rb', line 321 def sendmsg(destination, msg, &callback) if (msg.is_a?(RequestMessage) and msg.id == nil) msg.generate_id! end msg. = Time.now.to_f msg.reply_to = @id if msg.is_a?(RequestMessage) MQRPC::logger.debug "Tracking #{msg.class.name}##{msg.id} to #{destination}" @slidingwindow[destination][msg.id] = true end if msg.delayable @outbuffer[destination] << msg if @outbuffer[destination].length > MAXBUF flushout(destination) end else MQRPC::logger.debug "Sending to #{destination}: #{msg.inspect}" @mq.queue(destination, :durable => true).publish([msg].to_json, :persistent => true) end if block_given? op = Operation.new &callback MQRPC::logger.debug "New operation for #{msg.id}" @message_operations[msg.id] = op return op end end |
#sendmsg_topic(key, msg) ⇒ Object
311 312 313 314 315 316 317 318 319 |
# File 'lib/mqrpc/agent.rb', line 311 def sendmsg_topic(key, msg) if (msg.is_a?(RequestMessage) and msg.id == nil) msg.generate_id! end msg. = Time.now.to_f data = msg.to_json @mq.topic(@config.mqexchange).publish(data, :key => key) end |
#start_amqp ⇒ Object
def initialize
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 |
# File 'lib/mqrpc/agent.rb', line 110 def start_amqp @amqpthread = Thread.new do Thread.current[:name] = "AMQP" # Create connection to AMQP, and in turn, the main EventMachine loop. amqp_config = {:host => @config.mqhost, :port => @config.mqport, :user => @config.mquser, :pass => @config.mqpass, :vhost => @config.mqvhost, } AMQP.start(amqp_config) do @startup_mutex.synchronize do @mq = MQ.new # Notify the main calling thread (MessageSocket#initialize) that # we can continue @amqp_ready = true @startup_condvar.signal end MQRPC::logger.info "Subscribing to main queue #{@id}" subscribe(@id) # TODO(sissel): make this a deferred thread that reads from a Queue EM.defer { handle_subscriptions } EM.add_periodic_timer(1) do @outbuffer.each_key { |dest| flushout(dest) } @outbuffer.clear end end # AMQP.start end end |
#start_receiver ⇒ Object
def start_amqp
143 144 145 146 147 148 149 150 151 |
# File 'lib/mqrpc/agent.rb', line 143 def start_receiver Thread.new do Thread.current[:name] = "receiver" while true header, = @receive_queue.pop (header, ) end end end |
#subscribe(name) ⇒ Object
def start_receiver
153 154 155 156 |
# File 'lib/mqrpc/agent.rb', line 153 def subscribe(name) MQRPC::logger.info "Wanting to subscribe to queue #{name}" @want_subscriptions << [:queue, name] end |
#subscribe_topic(name) ⇒ Object
def unsubscribe
175 176 177 |
# File 'lib/mqrpc/agent.rb', line 175 def subscribe_topic(name) @want_subscriptions << [:topic, name] end |
#unsubscribe(name) ⇒ Object
def subscribe
158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 |
# File 'lib/mqrpc/agent.rb', line 158 def unsubscribe(name) exchange = @mq.topic(@config.mqexchange, :durable => true) mq_q = @mq.queue(name, :durable => true) mq_q.bind(exchange, :key => "*") op = Operation.new mq_q.unsubscribe { op.finished } op.wait_until_finished @queues.delete(name) #mq_q.unsubscribe { @queues.delete(name) } ## wait for unsubscribe to finish; it's async #while @queues.member?(name) #sleep(0.1) #end end |