Class: Circuitry::Subscriber

Inherits:
Object
  • Object
show all
Includes:
Concerns::Async, Circuitry::Services::SQS
Defined in:
lib/circuitry/subscriber.rb

Constant Summary collapse

DEFAULT_OPTIONS =
{
    lock: true,
    async: false,
    timeout: 15,
    wait_time: 10,
    batch_size: 10,
}.freeze
CONNECTION_ERRORS =
[
    Aws::SQS::Errors::ServiceError,
].freeze

Instance Attribute Summary collapse

Attributes included from Concerns::Async

#async

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Circuitry::Services::SQS

#sqs

Methods included from Concerns::Async

#async?, included, #process_asynchronously

Constructor Details

#initialize(queue, options = {}) ⇒ Subscriber

Returns a new instance of Subscriber.

Raises:

  • (ArgumentError)


28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/circuitry/subscriber.rb', line 28

def initialize(queue, options = {})
  raise ArgumentError.new('queue cannot be nil') if queue.nil?

  options = DEFAULT_OPTIONS.merge(options)

  self.subscribed = false
  self.queue = queue
  self.lock = options[:lock]
  self.async = options[:async]
  self.timeout = options[:timeout]
  self.wait_time = options[:wait_time]
  self.batch_size = options[:batch_size]

  trap_signals
end

Instance Attribute Details

#batch_sizeObject

Returns the value of attribute batch_size.



14
15
16
# File 'lib/circuitry/subscriber.rb', line 14

def batch_size
  @batch_size
end

#lockObject

Returns the value of attribute lock.



14
15
16
# File 'lib/circuitry/subscriber.rb', line 14

def lock
  @lock
end

#queueObject

Returns the value of attribute queue.



14
15
16
# File 'lib/circuitry/subscriber.rb', line 14

def queue
  @queue
end

#timeoutObject

Returns the value of attribute timeout.



14
15
16
# File 'lib/circuitry/subscriber.rb', line 14

def timeout
  @timeout
end

#wait_timeObject

Returns the value of attribute wait_time.



14
15
16
# File 'lib/circuitry/subscriber.rb', line 14

def wait_time
  @wait_time
end

Class Method Details

.async_strategiesObject



64
65
66
# File 'lib/circuitry/subscriber.rb', line 64

def self.async_strategies
  super - [:batch]
end

.default_async_strategyObject



68
69
70
# File 'lib/circuitry/subscriber.rb', line 68

def self.default_async_strategy
  Circuitry.config.subscribe_async_strategy
end

Instance Method Details

#subscribe(&block) ⇒ Object



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/circuitry/subscriber.rb', line 44

def subscribe(&block)
  raise ArgumentError.new('block required') if block.nil?
  raise SubscribeError.new('AWS configuration is not set') unless can_subscribe?

  logger.info("Subscribing to queue: #{queue}")

  self.subscribed = true
  poll(&block)
  self.subscribed = false

  logger.info("Unsubscribed from queue: #{queue}")
rescue *CONNECTION_ERRORS => e
  logger.error("Connection error to queue: #{queue}: #{e}")
  raise SubscribeError.new(e)
end

#subscribed?Boolean

Returns:

  • (Boolean)


60
61
62
# File 'lib/circuitry/subscriber.rb', line 60

def subscribed?
  subscribed
end