Class: Circuitry::Subscriber
- Inherits:
-
Object
- Object
- Circuitry::Subscriber
- Includes:
- Concerns::Async, Circuitry::Services::SQS
- Defined in:
- lib/circuitry/subscriber.rb
Constant Summary collapse
- DEFAULT_OPTIONS =
{ lock: true, async: false, timeout: 15, wait_time: 10, batch_size: 10, }.freeze
- CONNECTION_ERRORS =
[ Aws::SQS::Errors::ServiceError, ].freeze
Instance Attribute Summary collapse
-
#batch_size ⇒ Object
readonly
Returns the value of attribute batch_size.
-
#lock ⇒ Object
readonly
Returns the value of attribute lock.
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
-
#timeout ⇒ Object
readonly
Returns the value of attribute timeout.
-
#wait_time ⇒ Object
readonly
Returns the value of attribute wait_time.
Attributes included from Concerns::Async
Class Method Summary collapse
Instance Method Summary collapse
-
#initialize(queue, options = {}) ⇒ Subscriber
constructor
A new instance of Subscriber.
- #subscribe(&block) ⇒ Object
- #subscribed? ⇒ Boolean
Methods included from Circuitry::Services::SQS
Methods included from Concerns::Async
#async?, included, #process_asynchronously
Constructor Details
#initialize(queue, options = {}) ⇒ Subscriber
Returns a new instance of Subscriber.
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/circuitry/subscriber.rb', line 28 def initialize(queue, = {}) raise ArgumentError.new('queue cannot be nil') if queue.nil? = DEFAULT_OPTIONS.merge() self.subscribed = false self.queue = queue self.lock = [:lock] self.async = [:async] self.timeout = [:timeout] self.wait_time = [:wait_time] self.batch_size = [:batch_size] trap_signals end |
Instance Attribute Details
#batch_size ⇒ Object
Returns the value of attribute batch_size.
14 15 16 |
# File 'lib/circuitry/subscriber.rb', line 14 def batch_size @batch_size end |
#lock ⇒ Object
Returns the value of attribute lock.
14 15 16 |
# File 'lib/circuitry/subscriber.rb', line 14 def lock @lock end |
#queue ⇒ Object
Returns the value of attribute queue.
14 15 16 |
# File 'lib/circuitry/subscriber.rb', line 14 def queue @queue end |
#timeout ⇒ Object
Returns the value of attribute timeout.
14 15 16 |
# File 'lib/circuitry/subscriber.rb', line 14 def timeout @timeout end |
#wait_time ⇒ Object
Returns the value of attribute wait_time.
14 15 16 |
# File 'lib/circuitry/subscriber.rb', line 14 def wait_time @wait_time end |
Class Method Details
.async_strategies ⇒ Object
64 65 66 |
# File 'lib/circuitry/subscriber.rb', line 64 def self.async_strategies super - [:batch] end |
Instance Method Details
#subscribe(&block) ⇒ Object
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/circuitry/subscriber.rb', line 44 def subscribe(&block) raise ArgumentError.new('block required') if block.nil? raise SubscribeError.new('AWS configuration is not set') unless can_subscribe? logger.info("Subscribing to queue: #{queue}") self.subscribed = true poll(&block) self.subscribed = false logger.info("Unsubscribed from queue: #{queue}") rescue *CONNECTION_ERRORS => e logger.error("Connection error to queue: #{queue}: #{e}") raise SubscribeError.new(e) end |
#subscribed? ⇒ Boolean
60 61 62 |
# File 'lib/circuitry/subscriber.rb', line 60 def subscribed? subscribed end |