Class: CanvasSync::JobBatches::Pool

Inherits:
Object
  • Object
show all
Includes:
RedisModel
Defined in:
lib/canvas_sync/job_batches/pool.rb

Constant Summary collapse

POOL_REFILL =
RedisScript.new(Pathname.new(__FILE__) + "../pool_refill.lua")

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from RedisModel

#persist_bid_attr, #read_bid_attr

Constructor Details

#initialize(pooolid = nil, **kwargs) ⇒ Pool

Returns a new instance of Pool.



16
17
18
19
20
21
22
23
24
# File 'lib/canvas_sync/job_batches/pool.rb', line 16

def initialize(pooolid = nil, **kwargs)
  if pooolid
    @existing = true
    @pid = pooolid
  else
    @pid = SecureRandom.urlsafe_base64(10)
    initialize_new(**kwargs)
  end
end

Instance Attribute Details

#pidObject (readonly)

Returns the value of attribute pid.



7
8
9
# File 'lib/canvas_sync/job_batches/pool.rb', line 7

def pid
  @pid
end

Class Method Details

.cleanup_redis_index!Object

Administrative/console method to cleanup expired pools from the WebUI



148
149
150
151
152
153
# File 'lib/canvas_sync/job_batches/pool.rb', line 148

def self.cleanup_redis_index!
  suffixes = ["", "-active", "-jobs"]
  r.zrangebyscore("pools", "0", Batch::BID_EXPIRE_TTL.seconds.ago.to_i).each do |pid|
    r.zrem("pools", pid) if Batch.cleanup_redis_index_for("POOLID-#{pid}", suffixes)
  end
end

.from_pid(pid) ⇒ Object



26
27
28
29
# File 'lib/canvas_sync/job_batches/pool.rb', line 26

def self.from_pid(pid)
  raise "PID must be given" unless pid.present?
  new(pid)
end

.job_checked_in(status, options) ⇒ Object



142
143
144
145
# File 'lib/canvas_sync/job_batches/pool.rb', line 142

def self.job_checked_in(status, options)
  pid = options['pool_id']
  from_pid(pid).job_checked_in(status, options)
end

Instance Method Details

#<<(job_desc) ⇒ Object



31
32
33
# File 'lib/canvas_sync/job_batches/pool.rb', line 31

def <<(job_desc)
  add_job(job_desc)
end

#active_count(r = redis) ⇒ Object



116
117
118
# File 'lib/canvas_sync/job_batches/pool.rb', line 116

def active_count(r = redis)
  r.hlen("#{redis_key}-active") + r.hincrby(redis_key, "_active_count", 0)
end

#active_jobs(r = redis) ⇒ Object



120
121
122
# File 'lib/canvas_sync/job_batches/pool.rb', line 120

def active_jobs(r = redis)
  r.hvals("#{redis_key}-active").map {|desc| JSON.parse(desc)[0] }
end

#add_job(job_desc) ⇒ Object



35
36
37
# File 'lib/canvas_sync/job_batches/pool.rb', line 35

def add_job(job_desc)
  add_jobs([job_desc])
end

#add_jobs(job_descs, skip_refill: false) ⇒ Object



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/canvas_sync/job_batches/pool.rb', line 39

def add_jobs(job_descs, skip_refill: false)
  job_descs.each do |job_desc|
    wrapper = Batch.new
    wrapper.description = "Pool Job Wrapper (PID: #{pid})"
    checkin_event = (on_failed_job == :wait) ? :success : :complete
    wrapper.on(checkin_event, "#{self.class.to_s}.job_checked_in", pool_id: pid)
    wrapper.placeholder!

    job_desc = job_desc.symbolize_keys
    job_desc = job_desc.merge!(
      job: job_desc[:job].to_s,
      pool_wrapper_batch: wrapper.bid,
    )

    push_job_to_pool(job_desc)
  end
  refill_allotment unless skip_refill
end

#cleanup_if_emptyObject



97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/canvas_sync/job_batches/pool.rb', line 97

def cleanup_if_empty
  self.order

  activec, pactivec, pendingc, clean_when_empty, keep_open, holds = redis.multi do |r|
    r.hlen("#{redis_key}-active")
    r.hget(redis_key, "_active_count")
    pending_count(r)
    r.hget(redis_key, 'clean_when_empty')
    r.hget(redis_key, 'keep_open')
    r.scard("#{redis_key}-holds")
  end

  return if keep_open == 'true' || clean_when_empty == 'false' || (holds && holds > 0)

  if activec <= 0 && (pactivec.try(:to_i) || 0) <= 0 && pendingc <= 0
    cleanup_redis
  end
end

#cleanup_redisObject



86
87
88
89
90
91
92
93
94
95
# File 'lib/canvas_sync/job_batches/pool.rb', line 86

def cleanup_redis
  Batch.logger.debug {"Cleaning redis of pool #{pid}"}
  redis do |r|
    r.zrem("pools", pid)
    r.unlink(
      "#{redis_key}",
      "#{redis_key}-jobs",
    )
  end
end

#job_checked_in(status, options) ⇒ Object



137
138
139
140
# File 'lib/canvas_sync/job_batches/pool.rb', line 137

def job_checked_in(status, options)
  active_count = refill_allotment(status.bid)
  cleanup_if_empty unless active_count > 0
end

#keep_open!(token = SecureRandom.urlsafe_base64(10)) ⇒ Object



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/canvas_sync/job_batches/pool.rb', line 58

def keep_open!(token = SecureRandom.urlsafe_base64(10))
  if block_given?
    begin
      token = keep_open!(token)
      yield
    ensure
      let_close!(token)
    end
  else
    redis.multi do |r|
      r.sadd("#{redis_key}-holds", token)
      r.expire("#{redis_key}-holds", Batch::BID_EXPIRE_TTL)
    end
    token
  end
end

#let_close!(token = :unset) ⇒ Object



75
76
77
78
79
80
81
82
83
84
# File 'lib/canvas_sync/job_batches/pool.rb', line 75

def let_close!(token = :unset)
  if token == :unset # Legacy
    redis.del("#{redis_key}-holds")
    redis.hset(redis_key, 'keep_open', 'false')
  else
    redis.srem("#{redis_key}-holds", token)
  end

  cleanup_if_empty
end

#pending_count(r = redis) ⇒ Object



124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/canvas_sync/job_batches/pool.rb', line 124

def pending_count(r = redis)
  jobs_key = "#{redis_key}-jobs"
  order = self.order || 'fifo'
  case order.to_sym
  when :fifo, :lifo
    r.llen(jobs_key)
  when :random
    r.scard(jobs_key)
  when :priority
    r.zcard(jobs_key)
  end
end