Class: BBK::AMQP::Consumer

Inherits:
Object
  • Object
show all
Defined in:
lib/bbk/amqp/consumer.rb

Constant Summary collapse

DEFAULT_OPTIONS =
{
  consumer_pool_size:               3,
  consumer_pool_abort_on_exception: true,
  prefetch_size:                    10,
  consumer_tag:                     nil,
  rejection_policy:                 RejectionPolicies::Requeue.new
}.freeze
PROTOCOLS =
%w[mq amqp amqps].freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(connection, queue_name: nil, publisher: nil, **options) ⇒ Consumer

Returns a new instance of Consumer.



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/bbk/amqp/consumer.rb', line 21

def initialize(connection, queue_name: nil, publisher: nil, **options)
  @connection = connection
  @channel = options.delete(:channel)
  @queue = options.delete(:queue)
  @publisher = publisher

  if @queue.nil? && queue_name.nil?
    raise ArgumentError.new('queue_name or queue must be provided!')
  end

  @queue_name = @queue&.name || queue_name

  @options = DEFAULT_OPTIONS.merge(options)
  @rejection_policy = @options.delete(:rejection_policy)

  logger = @options.fetch(:logger, BBK::AMQP.logger)
  logger = logger.respond_to?(:tagged) ? logger : ActiveSupport::TaggedLogging.new(logger)
  @logger = BBK::Utils::ProxyLogger.new(logger, tags: [self.class.to_s, queue_name])
end

Instance Attribute Details

#connectionObject (readonly)

Returns the value of attribute connection.



8
9
10
# File 'lib/bbk/amqp/consumer.rb', line 8

def connection
  @connection
end

#loggerObject (readonly)

Returns the value of attribute logger.



8
9
10
# File 'lib/bbk/amqp/consumer.rb', line 8

def logger
  @logger
end

#optionsObject (readonly)

Returns the value of attribute options.



8
9
10
# File 'lib/bbk/amqp/consumer.rb', line 8

def options
  @options
end

#publisherObject

Returns the value of attribute publisher.



9
10
11
# File 'lib/bbk/amqp/consumer.rb', line 9

def publisher
  @publisher
end

#queueObject (readonly)

Returns the value of attribute queue.



8
9
10
# File 'lib/bbk/amqp/consumer.rb', line 8

def queue
  @queue
end

#queue_nameObject (readonly)

Returns the value of attribute queue_name.



8
9
10
# File 'lib/bbk/amqp/consumer.rb', line 8

def queue_name
  @queue_name
end

#rejection_policyObject (readonly)

Returns the value of attribute rejection_policy.



8
9
10
# File 'lib/bbk/amqp/consumer.rb', line 8

def rejection_policy
  @rejection_policy
end

Instance Method Details

#ack(incoming, *args, answer: nil, **kwargs) ⇒ Object

Note:

answer should processing amqp publisher

Ack incoming message and not send answer.

Parameters:

  • incoming (BBK::AMQP::Message)

    consumed message from amqp channel

  • answer (BBK::App::Dispatcher::Result) (defaults to: nil)

    answer message



85
86
87
88
89
90
91
92
# File 'lib/bbk/amqp/consumer.rb', line 85

def ack(incoming, *args, answer: nil, **kwargs)
  # [] - для работы тестов. В реальности вернется объект VersionedDeliveryTag у
  #  которого to_i (вызывается внутри channel.ack) вернет фактическоe число
  # logger.debug "Ack message #{incoming.headers[:type]}[#{incoming.headers[:message_id]}] on channel: #{incoming.delivery_info[:channel]&.id}[#{incoming.delivery_info[:channel]&.object_id}] delivery tag: #{incoming.delivery_info[:delivery_tag].to_i}"
  send_answer(incoming, answer) unless answer.nil?
  logger.debug "Ack message #{incoming.headers[:type]}[#{incoming.headers[:message_id]}]  delivery tag: #{incoming.delivery_info[:delivery_tag].to_i}"
  incoming.delivery_info[:channel].ack incoming.delivery_info[:delivery_tag]
end

#closeObject

Close consumer - try close amqp channel



121
122
123
124
125
126
127
128
129
130
# File 'lib/bbk/amqp/consumer.rb', line 121

def close
  @channel.tap do |c|
    return nil unless c

    logger.info 'Closing...'
    @channel = nil
    c.close
    logger.info 'Stopped'
  end
end

#nack(incoming, *args, error: nil, **_kwargs) ⇒ Object

Nack incoming message

Parameters:



105
106
107
# File 'lib/bbk/amqp/consumer.rb', line 105

def nack(incoming, *args, error: nil, **_kwargs)
  rejection_policy.call(incoming, error)
end

#protocolsObject

Return protocol list which consumer support



42
43
44
# File 'lib/bbk/amqp/consumer.rb', line 42

def protocols
  PROTOCOLS
end

#run(msg_stream) ⇒ Object

Running non blocking consumer

Parameters:

  • msg_stream (Enumerable)
    • object with << method



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/bbk/amqp/consumer.rb', line 53

def run(msg_stream)
  @channel ||= @connection.create_channel(nil, options[:consumer_pool_size],
                                          options[:consumer_pool_abort_on_exception]).tap do |ch|
    ch.prefetch(options[:prefetch_size])
  end

  logger.add_tags "Ch##{@channel.id}"

  @queue ||= @channel.queue(queue_name, passive: true)

  subscribe_opts = {
    block:        false,
    manual_ack:   true,
    consumer_tag: options[:consumer_tag],
    exclusive: options.fetch(:exclusive, false)
  }.compact

  logger.info 'Starting...'
  @subscription = queue.subscribe(subscribe_opts) do |delivery_info, , payload|
    message = Message.new(self, delivery_info, , payload)
    # logger.debug "Consumed message #{message.headers[:type]}[#{message.headers[:message_id]}] on channel: #{delivery_info.channel&.id}[#{delivery_info.channel&.object_id}] delivery tag: #{message.delivery_info[:delivery_tag].to_i}"
    logger.debug "Consumed message #{message.headers[:type]}[#{message.headers[:message_id]}] delivery tag: #{message.delivery_info[:delivery_tag].to_i}"

    msg_stream << message
  end
  msg_stream
end

#stopObject

stop consuming messages



110
111
112
113
114
115
116
117
118
# File 'lib/bbk/amqp/consumer.rb', line 110

def stop
  @subscription.tap do |s|
    return nil unless s

    logger.info 'Stopping...'
    @subscription = nil
    s.cancel
  end
end

#sync?Boolean

Signal that need answer on every incoming message

Returns:

  • (Boolean)


47
48
49
# File 'lib/bbk/amqp/consumer.rb', line 47

def sync?
  false
end