Class: Smith::Messaging::Receiver

Inherits:
Object
  • Object
show all
Includes:
Logger, Util
Defined in:
lib/smith/messaging/receiver.rb

Instance Method Summary collapse

Methods included from Util

#number_of_consumers, #number_of_messages

Methods included from Logger

included

Constructor Details

#initialize(queue_def, opts = {}, &blk) ⇒ Receiver

Returns a new instance of Receiver.



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/smith/messaging/receiver.rb', line 13

def initialize(queue_def, opts={}, &blk)

  # This is for backward compatibility.
  @queue_def = queue_def.is_a?(QueueDefinition) ? queue_def : QueueDefinition.new(queue_def, opts)

  @acl_type_cache = AclTypeCache.instance

  @foo_options = {
    :auto_ack => option_or_default(@queue_def.options, :auto_ack, true),
    :threading => option_or_default(@queue_def.options, :threading, false)}

  @payload_type = Array(option_or_default(@queue_def.options, :type, []))

  prefetch = option_or_default(@queue_def.options, :prefetch, Smith.config.agent.prefetch)

  @options = AmqpOptions.new(@queue_def.options)
  @options.routing_key = @queue_def.normalise

  @message_counter = MessageCounter.new(@queue_def.denormalise)

  @channel_completion = EM::Completion.new
  @queue_completion = EM::Completion.new
  @exchange_completion = EM::Completion.new
  @requeue_options_completion = EM::Completion.new

  @reply_queues = {}

  open_channel(:prefetch => prefetch) do |channel|
    @channel_completion.succeed(channel)
    channel.direct(@queue_def.normalise, @options.exchange) do |exchange|
      @exchange_completion.succeed(exchange)
    end
  end

  open_channel(:prefetch => prefetch) do |channel|
    channel.queue(@queue_def.normalise, @options.queue) do |queue|
      @exchange_completion.completion do |exchange|
        queue.bind(exchange, :routing_key => @queue_def.normalise)
        @queue_completion.succeed(queue)
        @requeue_options_completion.succeed(:exchange => exchange, :queue => queue)
      end
    end
  end

  blk.call(self) if blk
end

Instance Method Details

#ack(multiple = false) ⇒ Object



60
61
62
# File 'lib/smith/messaging/receiver.rb', line 60

def ack(multiple=false)
  @channel_completion.completion {|channel| channel.ack(multiple) }
end

#delete(&blk) ⇒ Object



159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
# File 'lib/smith/messaging/receiver.rb', line 159

def delete(&blk)
  @exchange_completion.completion do |exchange|
    @queue_completion.completion do |queue|
      @channel_completion.completion do |channel|
        queue.unbind(exchange) do
          queue.delete do
            exchange.delete do
              channel.close(&blk)
            end
          end
        end
      end
    end
  end
end

#on_error(chain = false, &blk) ⇒ Object

Define a channel error handler.



129
130
131
132
133
134
# File 'lib/smith/messaging/receiver.rb', line 129

def on_error(chain=false, &blk)
  # TODO Check that this chains callbacks
  @channel_completion.completion do |channel|
    channel.on_error(&blk)
  end
end

#on_requeue(&blk) ⇒ Object



189
190
191
192
193
# File 'lib/smith/messaging/receiver.rb', line 189

def on_requeue(&blk)
  @requeue_options_completion.completion do |requeue_options|
    requeue_options.merge!(:on_requeue => blk)
  end
end

#on_requeue_limit(&blk) ⇒ Object



195
196
197
198
199
# File 'lib/smith/messaging/receiver.rb', line 195

def on_requeue_limit(&blk)
  @requeue_options_completion.completion do |requeue_options|
    requeue_options.merge!(:on_requeue_limit => blk)
  end
end

#pop(&blk) ⇒ Object

pops a message off the queue and passes the headers and payload into the block. pop will automatically acknowledge the message unless the options sets :ack to false.



113
114
115
116
117
118
119
120
121
122
123
124
125
126
# File 'lib/smith/messaging/receiver.rb', line 113

def pop(&blk)
  opts = @options.pop
  @queue_completion.completion do |queue|
    @requeue_options_completion.completion do |requeue_options|
      queue.pop(opts) do |, payload|
        if payload
          on_message(, payload, requeue_options, &blk)
        else
          blk.call(nil,nil)
        end
      end
    end
  end
end

#queue_nameObject



155
156
157
# File 'lib/smith/messaging/receiver.rb', line 155

def queue_name
  @queue_def.denormalise
end

#requeue_parameters(opts = {}) ⇒ Object



183
184
185
186
187
# File 'lib/smith/messaging/receiver.rb', line 183

def requeue_parameters(opts={})
  @requeue_options_completion.completion do |requeue_options|
    requeue_options.merge!(opts)
  end
end

#setup_reply_queue(reply_queue_name, &blk) ⇒ Object



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/smith/messaging/receiver.rb', line 64

def setup_reply_queue(reply_queue_name, &blk)
  if @reply_queues[reply_queue_name]
    blk.call(@reply_queues[reply_queue_name])
  else
    @exchange_completion.completion do |exchange|
      logger.debug { "Attaching to reply queue: #{reply_queue_name}" }

      queue_def = QueueDefinition.new(reply_queue_name, :auto_delete => true, :immediate => true, :mandatory => true, :durable => false)

      Smith::Messaging::Sender.new(queue_def) do |sender|
        @reply_queues[reply_queue_name] = sender
        blk.call(sender)
      end
    end
  end
end

#status(&blk) ⇒ Object



175
176
177
178
179
180
181
# File 'lib/smith/messaging/receiver.rb', line 175

def status(&blk)
  @queue_completion.completion do |queue|
    queue.status do |num_messages, num_consumers|
      blk.call(num_messages, num_consumers)
    end
  end
end

#subscribe(&blk) ⇒ Object

Subscribes to a queue and passes the headers and payload into the block. subscribe will automatically acknowledge the message unless the options sets :ack to false.



84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/smith/messaging/receiver.rb', line 84

def subscribe(&blk)
  @queue_completion.completion do |queue|
    @requeue_options_completion.completion do |requeue_options|
      if !queue.subscribed?
        opts = @options.subscribe
        logger.debug { "Subscribing to: [queue]:#{@queue_def.denormalise} [options]:#{opts}" }
        queue.subscribe(opts) do |,payload|
          if payload
            on_message(, payload, requeue_options, &blk)
          else
            logger.verbose { "Received null message on: #{@queue_def.denormalise} [options]:#{opts}" }
          end
        end
      else
        logger.error { "Queue is already subscribed too. Not listening on: #{@queue_def.denormalise}" }
      end
    end
  end
end

#unsubscribe(&blk) ⇒ Object



104
105
106
107
108
# File 'lib/smith/messaging/receiver.rb', line 104

def unsubscribe(&blk)
  @queue_completion.completion do |queue|
    queue.unsubscribe(&blk)
  end
end