Class: Smith::Messaging::Sender

Inherits:
Object
  • Object
show all
Includes:
Logger, Util
Defined in:
lib/smith/messaging/sender.rb

Instance Method Summary collapse

Methods included from Util

#number_of_consumers, #number_of_messages

Methods included from Logger

included

Constructor Details

#initialize(queue_def, opts = {}, &blk) ⇒ Sender

Returns a new instance of Sender.



10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/smith/messaging/sender.rb', line 10

def initialize(queue_def, opts={}, &blk)

  # This is for backward compatibility.
  @queue_def = queue_def.is_a?(QueueDefinition) ? queue_def : QueueDefinition.new(queue_def, opts)

  @acl_type_cache = AclTypeCache.instance

  @reply_container = {}

  prefetch = option_or_default(@queue_def.options, :prefetch, Smith.config.agent.prefetch)

  @options = AmqpOptions.new(@queue_def.options)
  @options.routing_key = @queue_def.normalise

  @message_counts = Hash.new(0)

  @exchange_completion = EM::Completion.new
  @queue_completion = EM::Completion.new
  @channel_completion = EM::Completion.new

  open_channel(:prefetch => prefetch) do |channel|
    @channel_completion.succeed(channel)
    channel.direct(@queue_def.normalise, @options.exchange) do |exchange|

      exchange.on_return do |basic_return,,payload|
        logger.error { "#{@acl_type_cache[.type].new.parse_from_string} returned! Exchange: #{reply_code.exchange}, reply_code: #{basic_return.reply_code}, reply_text: #{basic_return.reply_text}" }
        logger.error { "Properties: #{.properties}" }
      end

      channel.queue(@queue_def.normalise, @options.queue) do |queue|
        queue.bind(exchange, :routing_key => @queue_def.normalise)

        @queue_completion.succeed(queue)
        @exchange_completion.succeed(exchange)
      end
    end
  end

  blk.call(self) if blk
end

Instance Method Details

#consumer_count(&blk) ⇒ Object



150
151
152
153
154
# File 'lib/smith/messaging/sender.rb', line 150

def consumer_count(&blk)
  status do |_, consumers|
    blk.call(consumers)
  end
end

#counterObject



156
157
158
# File 'lib/smith/messaging/sender.rb', line 156

def counter
  @message_counts[@queue_def.denormalise]
end

#delete(&blk) ⇒ Object



117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/smith/messaging/sender.rb', line 117

def delete(&blk)
  @queue_completion.completion do |queue|
    queue.delete do
      @exchange_completion.completion do |exchange|
        exchange.delete do
          @channel_completion.completion do |channel|
            channel.close(&blk)
          end
        end
      end
    end
  end
end

#message_count(&blk) ⇒ Object



144
145
146
147
148
# File 'lib/smith/messaging/sender.rb', line 144

def message_count(&blk)
  status do |messages|
    blk.call(messages)
  end
end

#on_error(chain = false, &blk) ⇒ Object

Define a channel error handler.



161
162
163
164
165
# File 'lib/smith/messaging/sender.rb', line 161

def on_error(chain=false, &blk)
  @channel_completion.completion do |channel|
    channel.on_error(&blk)
  end
end

#on_reply(opts = {}, &blk) ⇒ Object

Set up a listener that will receive replies from the published messages. You must publish with intent to reply – tee he.

If you pass in a queue_name the same queue name will get used for every reply. This means that there are no create and teardown costs for each message. If no queue_name is given a random one will be assigned.



88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/smith/messaging/sender.rb', line 88

def on_reply(opts={}, &blk)
  @reply_proc = blk

  @timeout ||= Timeout.new(Smith.config.smith.timeout, :queue_name => @queue_def.denormalise)

  queue_def = QueueDefinition.new(opts.delete(:reply_queue_name) || "#{@queue_def.denormalise}.reply", opts.merge(:auto_delete => true, :durable => false))
  logger.debug { "reply queue: #{queue_def.denormalise}" }

  @reply_queue_completion ||= EM::Completion.new.tap do |completion|
    Receiver.new(queue_def) do |queue|
      queue.subscribe do |payload, receiver|
        @reply_container.delete(receiver.correlation_id).tap do |reply|
          if reply
            reply[:timeout].cancel_timeout
            reply[:reply_proc].call(payload, receiver)
          else
            receiver.ack if opts[:auto_ack]
            logger.error { "No reply block for correlation_id: #{receiver.correlation_id}. This is probably a timed out message. Message: #{payload.to_json}" }
          end
        end
      end

      EM.next_tick do
        completion.succeed(queue)
      end
    end
  end
end

#on_reply_error(&blk) ⇒ Object

This gets called if there is a mismatch in the message_id & correlation_id.



132
133
134
# File 'lib/smith/messaging/sender.rb', line 132

def on_reply_error(&blk)
  @reply_error = blk
end

#on_timeout(timeout = nil, &blk) ⇒ Object



78
79
80
# File 'lib/smith/messaging/sender.rb', line 78

def on_timeout(timeout=nil, &blk)
  @timeout = Timeout.new(timeout || Smith.config.smith.timeout, &blk)
end

#publish(payload, opts = {}, &blk) ⇒ Object

If reply queue is set the block will be called when the message recipient replies to the message and it is received.

If a block is passed to this method but the :reply_queue option is not set it will be called when the message has been safely published.

If the :reply_queue is an empty string then a random queue name will be generated.



60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/smith/messaging/sender.rb', line 60

def publish(payload, opts={}, &blk)
  if @reply_queue_completion
    @reply_queue_completion.completion do |reply_queue|
      message_id = random
      logger.verbose { "message_id: #{message_id}" }

      #### !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! ####
      #### TODO if there is a timeout delete   ####
      #### the proc from the @reply_container. ####
      #### !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! ####
      @reply_container[message_id] = {:reply_proc => @reply_proc, :timeout => @timeout.clone.tap {|t| t.set_timeout(message_id) }}
      _publish(payload, @options.publish(opts, {:reply_to => reply_queue.queue_name, :message_id => message_id}))
    end
  else
    _publish(payload, @options.publish(opts), &blk)
  end
end

#queue_nameObject



167
168
169
# File 'lib/smith/messaging/sender.rb', line 167

def queue_name
  @queue_def.denormalise
end

#status(&blk) ⇒ Object



136
137
138
139
140
141
142
# File 'lib/smith/messaging/sender.rb', line 136

def status(&blk)
  @queue_completion.completion do |queue|
    queue.status do |num_messages, num_consumers|
      blk.call(num_messages, num_consumers)
    end
  end
end