Class: Eventboss::LongPoller
- Inherits:
-
Object
- Object
- Eventboss::LongPoller
- Includes:
- Logging, SafeThread
- Defined in:
- lib/eventboss/long_poller.rb
Overview
LongPoller fetches messages from SQS using Long Polling docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-long-polling.html It starts one thread per queue (handled by Launcher)
Constant Summary collapse
- TIME_WAIT =
10
Instance Attribute Summary collapse
-
#id ⇒ Object
readonly
Returns the value of attribute id.
-
#listener ⇒ Object
readonly
Returns the value of attribute listener.
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
Instance Method Summary collapse
- #fetch_and_dispatch ⇒ Object
-
#initialize(launcher, bus, client, queue, listener) ⇒ LongPoller
constructor
A new instance of LongPoller.
- #kill(wait = false) ⇒ Object
- #run ⇒ Object
- #start ⇒ Object
- #terminate(wait = false) ⇒ Object
Methods included from SafeThread
#handle_exception, #safe_thread
Methods included from Logging
Constructor Details
#initialize(launcher, bus, client, queue, listener) ⇒ LongPoller
Returns a new instance of LongPoller.
13 14 15 16 17 18 19 20 21 22 |
# File 'lib/eventboss/long_poller.rb', line 13 def initialize(launcher, bus, client, queue, listener) @id = "poller-#{queue.name}" @launcher = launcher @bus = bus @client = client @queue = queue @listener = listener @thread = nil @stop = false end |
Instance Attribute Details
#id ⇒ Object (readonly)
Returns the value of attribute id.
11 12 13 |
# File 'lib/eventboss/long_poller.rb', line 11 def id @id end |
#listener ⇒ Object (readonly)
Returns the value of attribute listener.
11 12 13 |
# File 'lib/eventboss/long_poller.rb', line 11 def listener @listener end |
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
11 12 13 |
# File 'lib/eventboss/long_poller.rb', line 11 def queue @queue end |
Instance Method Details
#fetch_and_dispatch ⇒ Object
44 45 46 47 48 49 50 51 |
# File 'lib/eventboss/long_poller.rb', line 44 def fetch_and_dispatch .each do || logger.debug(id) { "enqueueing message #{.}" } @bus << UnitOfWork.new(@client, queue, listener, ) rescue ClosedQueueError logger.info(id) { "skip message #{.} enqueuing due to closed queue" } end end |
#kill(wait = false) ⇒ Object
34 35 36 37 38 39 40 41 42 |
# File 'lib/eventboss/long_poller.rb', line 34 def kill(wait = false) @stop = true return unless @thread @thread.value if wait # Force shutdown of poller, in case the loop is stuck @thread.raise Eventboss::Shutdown @thread.value if wait end |
#run ⇒ Object
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/eventboss/long_poller.rb', line 53 def run fetch_and_dispatch until @stop @launcher.poller_stopped(self) rescue Eventboss::Shutdown @launcher.poller_stopped(self) rescue Aws::SQS::Errors::NonExistentQueue => exception handle_exception(exception, poller_id: id) @launcher.poller_stopped(self) rescue StandardError => exception handle_exception(exception, poller_id: id) # Give a chance for temporary AWS errors to be resolved # Sleep guarantees against repeating fast failure errors sleep TIME_WAIT @launcher.poller_stopped(self, restart: @stop == false) end |
#start ⇒ Object
24 25 26 |
# File 'lib/eventboss/long_poller.rb', line 24 def start @thread = safe_thread(id, &method(:run)) end |
#terminate(wait = false) ⇒ Object
28 29 30 31 32 |
# File 'lib/eventboss/long_poller.rb', line 28 def terminate(wait = false) @stop = true return unless @thread @thread.value if wait end |