Class: GovukMessageQueueConsumer::Consumer
- Inherits:
-
Object
- Object
- GovukMessageQueueConsumer::Consumer
- Defined in:
- lib/govuk_message_queue_consumer/consumer.rb
Class Method Summary collapse
Instance Method Summary collapse
-
#initialize(queue_name:, processor:, rabbitmq_connection: Consumer.default_connection_from_env, logger: Logger.new($stderr), worker_threads: 1, prefetch: 1) ⇒ Consumer
constructor
Create a new consumer.
- #run(subscribe_opts: {}) ⇒ Object
Constructor Details
#initialize(queue_name:, processor:, rabbitmq_connection: Consumer.default_connection_from_env, logger: Logger.new($stderr), worker_threads: 1, prefetch: 1) ⇒ Consumer
Create a new consumer
25 26 27 28 29 30 31 32 |
# File 'lib/govuk_message_queue_consumer/consumer.rb', line 25 def initialize(queue_name:, processor:, rabbitmq_connection: Consumer.default_connection_from_env, logger: Logger.new($stderr), worker_threads: 1, prefetch: 1) @queue_name = queue_name @processor = processor @rabbitmq_connection = rabbitmq_connection @logger = logger @worker_threads = worker_threads @prefetch = prefetch end |
Class Method Details
.default_connection_from_env ⇒ Object
3 4 5 6 7 8 9 10 |
# File 'lib/govuk_message_queue_consumer/consumer.rb', line 3 def self.default_connection_from_env # https://github.com/ruby-amqp/bunny/blob/066496d/docs/guides/connecting.md#paas-environments if !ENV["RABBITMQ_URL"].to_s.empty? Bunny.new else Bunny.new(RabbitMQConfig.from_environment(ENV)) end end |
Instance Method Details
#run(subscribe_opts: {}) ⇒ Object
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/govuk_message_queue_consumer/consumer.rb', line 34 def run(subscribe_opts: {}) @rabbitmq_connection.start subscribe_opts = { block: true, manual_ack: true }.merge(subscribe_opts) queue.subscribe(subscribe_opts) do |delivery_info, headers, payload| = Message.new(payload, headers, delivery_info) .process() rescue StandardError => e GovukError.notify(e) if defined?(GovukError) @logger.error "Uncaught exception in processor: \n\n #{e.class}: #{e.}\n\n#{e.backtrace.join("\n")}" exit(1) # Ensure rabbitmq requeues outstanding messages end rescue SignalException => e GovukError.notify(e) if defined?(GovukError) && e. != "SIGTERM" exit end |