Class: Jiggler::Scheduled::Enqueuer

Inherits:
Object
  • Object
show all
Includes:
Jiggler::Support::Helper
Defined in:
lib/jiggler/scheduled/enqueuer.rb

Constant Summary collapse

LUA_ZPOPBYSCORE =
<<~LUA
  local key, now = KEYS[1], ARGV[1]
  local jobs = redis.call('zrangebyscore', key, '-inf', now, 'limit', 0, 1)
  if jobs[1] then
    redis.call('zrem', key, jobs[1])
    return jobs[1]
  end
LUA

Instance Method Summary collapse

Methods included from Jiggler::Support::Helper

#log_error, #log_error_short, #logger, #safe_async, #scan_all, #tid

Constructor Details

#initialize(config) ⇒ Enqueuer

Returns a new instance of Enqueuer.



17
18
19
20
21
22
# File 'lib/jiggler/scheduled/enqueuer.rb', line 17

def initialize(config)
  @config = config
  @done = false
  @lua_zpopbyscore_sha = nil
  @tid = tid
end

Instance Method Details

#enqueue_jobsObject



24
25
26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/jiggler/scheduled/enqueuer.rb', line 24

def enqueue_jobs
  @config.with_async_redis do |conn|
    sorted_sets.each do |sorted_set|
      # Get next item in the queue with score (time to execute) <= now
      job_args = zpopbyscore(conn, key: sorted_set, argv: Time.now.to_f.to_s)
      while !@done && job_args
        push_job(conn, job_args)
        job_args = zpopbyscore(conn, key: sorted_set, argv: Time.now.to_f.to_s)
      end
    end
  rescue => err
    log_error_short(err, context: '\'Enqueuing jobs error\'', tid: @tid)
  end
end

#push_job(conn, job_args) ⇒ Object



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/jiggler/scheduled/enqueuer.rb', line 43

def push_job(conn, job_args)
  name = Oj.load(job_args, mode: :compat)['queue'] || @config.default_queue
  list_name = "#{@config.queue_prefix}#{name}"
  # logger.debug('Poller Enqueuer') { "Pushing #{job_args} to #{list_name}" }
  conn.call('LPUSH', list_name, job_args)
rescue => err
  log_error_short(
    err, { 
      context: '\'Pushing scheduled job error\'', 
      tid: @tid,
      job_args: job_args,
      queue: list_name
    }
  )
end

#terminateObject



39
40
41
# File 'lib/jiggler/scheduled/enqueuer.rb', line 39

def terminate
  @done = true
end