Class: Jiggler::AtLeastOnce::Fetcher

Inherits:
BaseFetcher show all
Defined in:
lib/jiggler/at_least_once/fetcher.rb

Defined Under Namespace

Classes: CurrentJob

Constant Summary collapse

TIMEOUT =

2 seconds of waiting for brpoplpush

2.0
RESERVE_QUEUE_SUFFIX =
'in_progress'

Instance Attribute Summary collapse

Attributes inherited from BaseFetcher

#collection, #config

Instance Method Summary collapse

Methods included from Support::Helper

#log_error, #log_error_short, #logger, #safe_async, #scan_all, #tid

Constructor Details

#initialize(config, collection) ⇒ Fetcher

Returns a new instance of Fetcher.



13
14
15
16
17
18
# File 'lib/jiggler/at_least_once/fetcher.rb', line 13

def initialize(config, collection)
  super
  @tasks_queue = FastContainers::PriorityQueue.new(:min)
  @condition = Async::Notification.new
  @consumers_queue = Queue.new
end

Instance Attribute Details

#producersObject (readonly)

Returns the value of attribute producers.



11
12
13
# File 'lib/jiggler/at_least_once/fetcher.rb', line 11

def producers
  @producers
end

Instance Method Details

#fetchObject



61
62
63
64
65
66
# File 'lib/jiggler/at_least_once/fetcher.rb', line 61

def fetch
  @condition.signal if signal?
  return :done if @consumers_queue.pop.nil?

  @tasks_queue.pop
end

#startObject



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/jiggler/at_least_once/fetcher.rb', line 28

def start
  config.sorted_queues_data.each do |queue, data|
    config[:fetchers_concurrency].times do
      safe_async("'Fetcher for #{queue}'") do
        list = data[:list]
        rlist = in_process_queue(list)
        loop do
          # @consumers_queue.num_waiting may return 1 even if there are no :(
          if (@consumers_queue.num_waiting.zero? || @consumers_queue.size > @config[:concurrency]) && !@done
            @condition.wait # supposed to block here until consumers notify
          end
          break if @done

          args = config.with_sync_redis do |conn|
            conn.blocking_call(false, 'BRPOPLPUSH', list, rlist, TIMEOUT)
          end
          # no requeue logic rn as we expect monitor to handle
          # in-process-tasks list for this process
          break if @done

          next if args.nil? 

          @tasks_queue.push(job(list, args, rlist), data[:priority]) 
          @consumers_queue.push('') # to unblock any waiting consumer
        end
        logger.debug("Fetcher for #{queue} stopped")
      rescue Async::Stop
        logger.debug("Fetcher for #{queue} received stop signal")
      end
    end
  end
end

#suspendObject



68
69
70
71
72
73
# File 'lib/jiggler/at_least_once/fetcher.rb', line 68

def suspend
  logger.debug("Suspending the fetcher")
  @done = true
  @condition.signal
  @consumers_queue.close # unblocks awaiting consumers
end