Class: Jiggler::AtLeastOnce::Fetcher
- Inherits:
-
BaseFetcher
- Object
- BaseFetcher
- Jiggler::AtLeastOnce::Fetcher
- Defined in:
- lib/jiggler/at_least_once/fetcher.rb
Defined Under Namespace
Classes: CurrentJob
Constant Summary collapse
- TIMEOUT =
2 seconds of waiting for brpoplpush
2.0
- RESERVE_QUEUE_SUFFIX =
'in_progress'
Instance Attribute Summary collapse
-
#producers ⇒ Object
readonly
Returns the value of attribute producers.
Attributes inherited from BaseFetcher
Instance Method Summary collapse
- #fetch ⇒ Object
-
#initialize(config, collection) ⇒ Fetcher
constructor
A new instance of Fetcher.
- #start ⇒ Object
- #suspend ⇒ Object
Methods included from Support::Helper
#log_error, #log_error_short, #logger, #safe_async, #scan_all, #tid
Constructor Details
#initialize(config, collection) ⇒ Fetcher
Returns a new instance of Fetcher.
13 14 15 16 17 18 |
# File 'lib/jiggler/at_least_once/fetcher.rb', line 13 def initialize(config, collection) super @tasks_queue = FastContainers::PriorityQueue.new(:min) @condition = Async::Notification.new @consumers_queue = Queue.new end |
Instance Attribute Details
#producers ⇒ Object (readonly)
Returns the value of attribute producers.
11 12 13 |
# File 'lib/jiggler/at_least_once/fetcher.rb', line 11 def producers @producers end |
Instance Method Details
#fetch ⇒ Object
61 62 63 64 65 66 |
# File 'lib/jiggler/at_least_once/fetcher.rb', line 61 def fetch @condition.signal if signal? return :done if @consumers_queue.pop.nil? @tasks_queue.pop end |
#start ⇒ Object
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/jiggler/at_least_once/fetcher.rb', line 28 def start config.sorted_queues_data.each do |queue, data| config[:fetchers_concurrency].times do safe_async("'Fetcher for #{queue}'") do list = data[:list] rlist = in_process_queue(list) loop do # @consumers_queue.num_waiting may return 1 even if there are no :( if (@consumers_queue.num_waiting.zero? || @consumers_queue.size > @config[:concurrency]) && !@done @condition.wait # supposed to block here until consumers notify end break if @done args = config.with_sync_redis do |conn| conn.blocking_call(false, 'BRPOPLPUSH', list, rlist, TIMEOUT) end # no requeue logic rn as we expect monitor to handle # in-process-tasks list for this process break if @done next if args.nil? @tasks_queue.push(job(list, args, rlist), data[:priority]) @consumers_queue.push('') # to unblock any waiting consumer end logger.debug("Fetcher for #{queue} stopped") rescue Async::Stop logger.debug("Fetcher for #{queue} received stop signal") end end end end |
#suspend ⇒ Object
68 69 70 71 72 73 |
# File 'lib/jiggler/at_least_once/fetcher.rb', line 68 def suspend logger.debug("Suspending the fetcher") @done = true @condition.signal @consumers_queue.close # unblocks awaiting consumers end |