Class: Smith::Messaging::Sender
- Inherits:
-
Object
- Object
- Smith::Messaging::Sender
- Defined in:
- lib/smith/messaging/sender.rb
Instance Method Summary collapse
- #consumer_count(&blk) ⇒ Object
- #counter ⇒ Object
- #delete(&blk) ⇒ Object
-
#initialize(queue_def, opts = {}, &blk) ⇒ Sender
constructor
A new instance of Sender.
- #message_count(&blk) ⇒ Object
-
#on_error(chain = false, &blk) ⇒ Object
Define a channel error handler.
-
#on_reply(opts = {}, &blk) ⇒ Object
Set up a listener that will receive replies from the published messages.
-
#on_reply_error(&blk) ⇒ Object
This gets called if there is a mismatch in the message_id & correlation_id.
- #on_timeout(timeout = nil, &blk) ⇒ Object
-
#publish(payload, opts = {}, &blk) ⇒ Object
If reply queue is set the block will be called when the message recipient replies to the message and it is received.
- #queue_name ⇒ Object
- #status(&blk) ⇒ Object
Methods included from Util
#number_of_consumers, #number_of_messages
Methods included from Logger
Constructor Details
#initialize(queue_def, opts = {}, &blk) ⇒ Sender
Returns a new instance of Sender.
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/smith/messaging/sender.rb', line 10 def initialize(queue_def, opts={}, &blk) # This is for backward compatibility. @queue_def = queue_def.is_a?(QueueDefinition) ? queue_def : QueueDefinition.new(queue_def, opts) @acl_type_cache = AclTypeCache.instance @reply_container = {} prefetch = option_or_default(@queue_def., :prefetch, Smith.config.agent.prefetch) @options = AmqpOptions.new(@queue_def.) @options.routing_key = @queue_def.normalise @message_counts = Hash.new(0) @exchange_completion = EM::Completion.new @queue_completion = EM::Completion.new @channel_completion = EM::Completion.new open_channel(:prefetch => prefetch) do |channel| @channel_completion.succeed(channel) channel.direct(@queue_def.normalise, @options.exchange) do |exchange| exchange.on_return do |basic_return,,payload| logger.error { "#{@acl_type_cache[.type].new.parse_from_string} returned! Exchange: #{reply_code.exchange}, reply_code: #{basic_return.reply_code}, reply_text: #{basic_return.reply_text}" } logger.error { "Properties: #{.properties}" } end channel.queue(@queue_def.normalise, @options.queue) do |queue| queue.bind(exchange, :routing_key => @queue_def.normalise) @queue_completion.succeed(queue) @exchange_completion.succeed(exchange) end end end blk.call(self) if blk end |
Instance Method Details
#consumer_count(&blk) ⇒ Object
150 151 152 153 154 |
# File 'lib/smith/messaging/sender.rb', line 150 def consumer_count(&blk) status do |_, consumers| blk.call(consumers) end end |
#counter ⇒ Object
156 157 158 |
# File 'lib/smith/messaging/sender.rb', line 156 def counter @message_counts[@queue_def.denormalise] end |
#delete(&blk) ⇒ Object
117 118 119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib/smith/messaging/sender.rb', line 117 def delete(&blk) @queue_completion.completion do |queue| queue.delete do @exchange_completion.completion do |exchange| exchange.delete do @channel_completion.completion do |channel| channel.close(&blk) end end end end end end |
#message_count(&blk) ⇒ Object
144 145 146 147 148 |
# File 'lib/smith/messaging/sender.rb', line 144 def (&blk) status do || blk.call() end end |
#on_error(chain = false, &blk) ⇒ Object
Define a channel error handler.
161 162 163 164 165 |
# File 'lib/smith/messaging/sender.rb', line 161 def on_error(chain=false, &blk) @channel_completion.completion do |channel| channel.on_error(&blk) end end |
#on_reply(opts = {}, &blk) ⇒ Object
Set up a listener that will receive replies from the published messages. You must publish with intent to reply – tee he.
If you pass in a queue_name the same queue name will get used for every reply. This means that there are no create and teardown costs for each message. If no queue_name is given a random one will be assigned.
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 |
# File 'lib/smith/messaging/sender.rb', line 88 def on_reply(opts={}, &blk) @reply_proc = blk @timeout ||= Timeout.new(Smith.config.smith.timeout, :queue_name => @queue_def.denormalise) queue_def = QueueDefinition.new(opts.delete(:reply_queue_name) || "#{@queue_def.denormalise}.reply", opts.merge(:auto_delete => true, :durable => false)) logger.debug { "reply queue: #{queue_def.denormalise}" } @reply_queue_completion ||= EM::Completion.new.tap do |completion| Receiver.new(queue_def) do |queue| queue.subscribe do |payload, receiver| @reply_container.delete(receiver.correlation_id).tap do |reply| if reply reply[:timeout].cancel_timeout reply[:reply_proc].call(payload, receiver) else receiver.ack if opts[:auto_ack] logger.error { "No reply block for correlation_id: #{receiver.correlation_id}. This is probably a timed out message. Message: #{payload.to_json}" } end end end EM.next_tick do completion.succeed(queue) end end end end |
#on_reply_error(&blk) ⇒ Object
This gets called if there is a mismatch in the message_id & correlation_id.
132 133 134 |
# File 'lib/smith/messaging/sender.rb', line 132 def on_reply_error(&blk) @reply_error = blk end |
#on_timeout(timeout = nil, &blk) ⇒ Object
78 79 80 |
# File 'lib/smith/messaging/sender.rb', line 78 def on_timeout(timeout=nil, &blk) @timeout = Timeout.new(timeout || Smith.config.smith.timeout, &blk) end |
#publish(payload, opts = {}, &blk) ⇒ Object
If reply queue is set the block will be called when the message recipient replies to the message and it is received.
If a block is passed to this method but the :reply_queue option is not set it will be called when the message has been safely published.
If the :reply_queue is an empty string then a random queue name will be generated.
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/smith/messaging/sender.rb', line 60 def publish(payload, opts={}, &blk) if @reply_queue_completion @reply_queue_completion.completion do |reply_queue| = random logger.verbose { "message_id: #{}" } #### !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! #### #### TODO if there is a timeout delete #### #### the proc from the @reply_container. #### #### !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! #### @reply_container[] = {:reply_proc => @reply_proc, :timeout => @timeout.clone.tap {|t| t.set_timeout() }} _publish(payload, @options.publish(opts, {:reply_to => reply_queue.queue_name, :message_id => })) end else _publish(payload, @options.publish(opts), &blk) end end |
#queue_name ⇒ Object
167 168 169 |
# File 'lib/smith/messaging/sender.rb', line 167 def queue_name @queue_def.denormalise end |
#status(&blk) ⇒ Object
136 137 138 139 140 141 142 |
# File 'lib/smith/messaging/sender.rb', line 136 def status(&blk) @queue_completion.completion do |queue| queue.status do |, num_consumers| blk.call(, num_consumers) end end end |