Class: Warren::Subscriber::Base

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Defined in:
lib/warren/subscriber/base.rb

Overview

A message takes a rabbitMQ message, and handles its acknowledgement or rejection.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(fox, delivery_info, properties, payload) ⇒ Base

Construct a basic subscriber for each received message. Call #process to handle to processing of the message

Parameters:

  • fox (Warren::Fox)

    The fox consumer that provided the message. Used to acknowledge messages

  • delivery_info (Bunny::DeliveryInfo)

    Contains the information necessary for acknowledging the message

  • properties (Bunny::MessageProperties)

    Contains additional information about the received message

  • payload (String)

    The message contents



39
40
41
42
43
44
45
# File 'lib/warren/subscriber/base.rb', line 39

def initialize(fox, delivery_info, properties, payload)
  @fox = fox
  @delivery_info = delivery_info
  @properties = properties
  @payload = payload
  @acknowledged = false
end

Instance Attribute Details

#delivery_infoBunny::DeliveryInfo (readonly)

Returns Contains the information necessary for acknowledging the message.

Returns:

  • (Bunny::DeliveryInfo)

    Contains the information necessary for acknowledging the message



17
18
19
# File 'lib/warren/subscriber/base.rb', line 17

def delivery_info
  @delivery_info
end

#foxWarren::Fox (readonly)

Returns The fox consumer that provided the message. Used to acknowledge messages.

Returns:

  • (Warren::Fox)

    The fox consumer that provided the message. Used to acknowledge messages



15
16
17
# File 'lib/warren/subscriber/base.rb', line 15

def fox
  @fox
end

#payloadString (readonly)

Returns The message contents.

Returns:

  • (String)

    The message contents



21
22
23
# File 'lib/warren/subscriber/base.rb', line 21

def payload
  @payload
end

#propertiesBunny::MessageProperties (readonly)

Returns Contains additional information about the received message.

Returns:

  • (Bunny::MessageProperties)

    Contains additional information about the received message



19
20
21
# File 'lib/warren/subscriber/base.rb', line 19

def properties
  @properties
end

Instance Method Details

#_process_Void

Called by Fox to trigger processing of the message and acknowledgment on success. In most cases the #process method should be used to customize behaviour.

Returns:

  • (Void)


51
52
53
54
# File 'lib/warren/subscriber/base.rb', line 51

def _process_
  process
  ack unless @acknowledged
end

#dead_letter(exception) ⇒ Void

Reject the message without re-queuing Will end up getting dead-lettered

Parameters:

  • exception (StandardError)

    The exception which triggered message dead-letter

Returns:

  • (Void)


87
88
89
90
91
92
93
94
# File 'lib/warren/subscriber/base.rb', line 87

def dead_letter(exception)
  error "Dead-letter: #{payload}"
  error "Dead-letter Exception: #{exception.message}"
  raise_if_acknowledged
  subscription.nack(delivery_tag)
  @acknowledged = true
  error 'Dead-letter nacked'
end

#delay(exception) ⇒ Void

Re-post the message to the delay exchange and acknowledges receipt of the original message. The delay exchange will return the messages to the original queue after a delay.

Parameters:

  • exception (StandardError)

    The exception that has caused the message to require a delay

Returns:

  • (Void)


106
107
108
109
110
111
112
113
114
115
# File 'lib/warren/subscriber/base.rb', line 106

def delay(exception)
  return dead_letter(exception) if attempt > max_retries

  warn "Delay: #{payload}"
  warn "Delay Exception: #{exception.message}"
  # Publish the message to the delay queue
  delayed.publish(payload, routing_key: routing_key, headers: { attempts: attempt + 1 })
  # Acknowledge the original message
  ack
end

#processObject

Triggers processing of the method. Over-ride this in subclasses to customize your handler.



58
59
60
# File 'lib/warren/subscriber/base.rb', line 58

def process
  true
end

#requeue(exception) ⇒ Void

Reject the message and re-queue ready for immediate reprocessing.

Parameters:

  • exception (StandardError)

    The exception which triggered message requeue

Returns:

  • (Void)


69
70
71
72
73
74
75
76
77
78
# File 'lib/warren/subscriber/base.rb', line 69

def requeue(exception)
  warn "Re-queue: #{payload}"
  warn "Re-queue Exception: #{exception.message}"
  raise_if_acknowledged
  # nack arguments: delivery_tag, multiple, requeue
  # http://reference.rubybunny.info/Bunny/Channel.html#nack-instance_method
  subscription.nack(delivery_tag, false, true)
  @acknowledged = true
  warn 'Re-queue nacked'
end