Class: Warren::Subscriber::Base
- Inherits:
-
Object
- Object
- Warren::Subscriber::Base
- Extended by:
- Forwardable
- Defined in:
- lib/warren/subscriber/base.rb
Overview
A message takes a rabbitMQ message, and handles its acknowledgement or rejection.
Instance Attribute Summary collapse
-
#delivery_info ⇒ Bunny::DeliveryInfo
readonly
Contains the information necessary for acknowledging the message.
-
#fox ⇒ Warren::Fox
readonly
The fox consumer that provided the message.
-
#payload ⇒ String
readonly
The message contents.
-
#properties ⇒ Bunny::MessageProperties
readonly
Contains additional information about the received message.
Instance Method Summary collapse
-
#_process_ ⇒ Void
Called by Fox to trigger processing of the message and acknowledgment on success.
-
#dead_letter(exception) ⇒ Void
Reject the message without re-queuing Will end up getting dead-lettered.
-
#delay(exception) ⇒ Void
Re-post the message to the delay exchange and acknowledges receipt of the original message.
-
#initialize(fox, delivery_info, properties, payload) ⇒ Base
constructor
Construct a basic subscriber for each received message.
-
#process ⇒ Object
Triggers processing of the method.
-
#requeue(exception) ⇒ Void
Reject the message and re-queue ready for immediate reprocessing.
Constructor Details
#initialize(fox, delivery_info, properties, payload) ⇒ Base
Construct a basic subscriber for each received message. Call #process to handle to processing of the message
39 40 41 42 43 44 45 |
# File 'lib/warren/subscriber/base.rb', line 39 def initialize(fox, delivery_info, properties, payload) @fox = fox @delivery_info = delivery_info @properties = properties @payload = payload @acknowledged = false end |
Instance Attribute Details
#delivery_info ⇒ Bunny::DeliveryInfo (readonly)
Returns Contains the information necessary for acknowledging the message.
17 18 19 |
# File 'lib/warren/subscriber/base.rb', line 17 def delivery_info @delivery_info end |
#fox ⇒ Warren::Fox (readonly)
Returns The fox consumer that provided the message. Used to acknowledge messages.
15 16 17 |
# File 'lib/warren/subscriber/base.rb', line 15 def fox @fox end |
#payload ⇒ String (readonly)
Returns The message contents.
21 22 23 |
# File 'lib/warren/subscriber/base.rb', line 21 def payload @payload end |
#properties ⇒ Bunny::MessageProperties (readonly)
Returns Contains additional information about the received message.
19 20 21 |
# File 'lib/warren/subscriber/base.rb', line 19 def properties @properties end |
Instance Method Details
#_process_ ⇒ Void
51 52 53 54 |
# File 'lib/warren/subscriber/base.rb', line 51 def _process_ process ack unless @acknowledged end |
#dead_letter(exception) ⇒ Void
Reject the message without re-queuing Will end up getting dead-lettered
87 88 89 90 91 92 93 94 |
# File 'lib/warren/subscriber/base.rb', line 87 def dead_letter(exception) error "Dead-letter: #{payload}" error "Dead-letter Exception: #{exception.}" raise_if_acknowledged subscription.nack(delivery_tag) @acknowledged = true error 'Dead-letter nacked' end |
#delay(exception) ⇒ Void
Re-post the message to the delay exchange and acknowledges receipt of the original message. The delay exchange will return the messages to the original queue after a delay.
106 107 108 109 110 111 112 113 114 115 |
# File 'lib/warren/subscriber/base.rb', line 106 def delay(exception) return dead_letter(exception) if attempt > max_retries warn "Delay: #{payload}" warn "Delay Exception: #{exception.}" # Publish the message to the delay queue delayed.publish(payload, routing_key: routing_key, headers: { attempts: attempt + 1 }) # Acknowledge the original message ack end |
#process ⇒ Object
Triggers processing of the method. Over-ride this in subclasses to customize your handler.
58 59 60 |
# File 'lib/warren/subscriber/base.rb', line 58 def process true end |
#requeue(exception) ⇒ Void
Reject the message and re-queue ready for immediate reprocessing.
69 70 71 72 73 74 75 76 77 78 |
# File 'lib/warren/subscriber/base.rb', line 69 def requeue(exception) warn "Re-queue: #{payload}" warn "Re-queue Exception: #{exception.}" raise_if_acknowledged # nack arguments: delivery_tag, multiple, requeue # http://reference.rubybunny.info/Bunny/Channel.html#nack-instance_method subscription.nack(delivery_tag, false, true) @acknowledged = true warn 'Re-queue nacked' end |