Class: CanvasSync::JobBatches::Pool
- Inherits:
-
Object
- Object
- CanvasSync::JobBatches::Pool
show all
- Includes:
- RedisModel
- Defined in:
- lib/canvas_sync/job_batches/pool.rb
Constant Summary
collapse
- POOL_REFILL =
RedisScript.new(Pathname.new(__FILE__) + "../pool_refill.lua")
Instance Attribute Summary collapse
Class Method Summary
collapse
Instance Method Summary
collapse
Methods included from RedisModel
#persist_bid_attr, #read_bid_attr
Constructor Details
#initialize(pooolid = nil, **kwargs) ⇒ Pool
Returns a new instance of Pool.
16
17
18
19
20
21
22
23
24
|
# File 'lib/canvas_sync/job_batches/pool.rb', line 16
def initialize(pooolid = nil, **kwargs)
if pooolid
@existing = true
@pid = pooolid
else
@pid = SecureRandom.urlsafe_base64(10)
initialize_new(**kwargs)
end
end
|
Instance Attribute Details
#pid ⇒ Object
Returns the value of attribute pid.
7
8
9
|
# File 'lib/canvas_sync/job_batches/pool.rb', line 7
def pid
@pid
end
|
Class Method Details
.cleanup_redis_index! ⇒ Object
Administrative/console method to cleanup expired pools from the WebUI
148
149
150
151
152
153
|
# File 'lib/canvas_sync/job_batches/pool.rb', line 148
def self.cleanup_redis_index!
suffixes = ["", "-active", "-jobs"]
r.zrangebyscore("pools", "0", Batch::BID_EXPIRE_TTL.seconds.ago.to_i).each do |pid|
r.zrem("pools", pid) if Batch.cleanup_redis_index_for("POOLID-#{pid}", suffixes)
end
end
|
.from_pid(pid) ⇒ Object
26
27
28
29
|
# File 'lib/canvas_sync/job_batches/pool.rb', line 26
def self.from_pid(pid)
raise "PID must be given" unless pid.present?
new(pid)
end
|
.job_checked_in(status, options) ⇒ Object
142
143
144
145
|
# File 'lib/canvas_sync/job_batches/pool.rb', line 142
def self.job_checked_in(status, options)
pid = options['pool_id']
from_pid(pid).job_checked_in(status, options)
end
|
Instance Method Details
#<<(job_desc) ⇒ Object
31
32
33
|
# File 'lib/canvas_sync/job_batches/pool.rb', line 31
def <<(job_desc)
add_job(job_desc)
end
|
#active_count(r = redis) ⇒ Object
116
117
118
|
# File 'lib/canvas_sync/job_batches/pool.rb', line 116
def active_count(r = redis)
r.hlen("#{redis_key}-active") + r.hincrby(redis_key, "_active_count", 0)
end
|
#active_jobs(r = redis) ⇒ Object
120
121
122
|
# File 'lib/canvas_sync/job_batches/pool.rb', line 120
def active_jobs(r = redis)
r.hvals("#{redis_key}-active").map {|desc| JSON.parse(desc)[0] }
end
|
#add_job(job_desc) ⇒ Object
35
36
37
|
# File 'lib/canvas_sync/job_batches/pool.rb', line 35
def add_job(job_desc)
add_jobs([job_desc])
end
|
#add_jobs(job_descs, skip_refill: false) ⇒ Object
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
|
# File 'lib/canvas_sync/job_batches/pool.rb', line 39
def add_jobs(job_descs, skip_refill: false)
job_descs.each do |job_desc|
wrapper = Batch.new
wrapper.description = "Pool Job Wrapper (PID: #{pid})"
checkin_event = (on_failed_job == :wait) ? :success : :complete
wrapper.on(checkin_event, "#{self.class.to_s}.job_checked_in", pool_id: pid)
wrapper.placeholder!
job_desc = job_desc.symbolize_keys
job_desc = job_desc.merge!(
job: job_desc[:job].to_s,
pool_wrapper_batch: wrapper.bid,
)
push_job_to_pool(job_desc)
end
refill_allotment unless skip_refill
end
|
#cleanup_if_empty ⇒ Object
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
|
# File 'lib/canvas_sync/job_batches/pool.rb', line 97
def cleanup_if_empty
self.order
activec, pactivec, pendingc, clean_when_empty, keep_open, holds = redis.multi do |r|
r.hlen("#{redis_key}-active")
r.hget(redis_key, "_active_count")
pending_count(r)
r.hget(redis_key, 'clean_when_empty')
r.hget(redis_key, 'keep_open')
r.scard("#{redis_key}-holds")
end
return if keep_open == 'true' || clean_when_empty == 'false' || (holds && holds > 0)
if activec <= 0 && (pactivec.try(:to_i) || 0) <= 0 && pendingc <= 0
cleanup_redis
end
end
|
#cleanup_redis ⇒ Object
86
87
88
89
90
91
92
93
94
95
|
# File 'lib/canvas_sync/job_batches/pool.rb', line 86
def cleanup_redis
Batch.logger.debug {"Cleaning redis of pool #{pid}"}
redis do |r|
r.zrem("pools", pid)
r.unlink(
"#{redis_key}",
"#{redis_key}-jobs",
)
end
end
|
#job_checked_in(status, options) ⇒ Object
137
138
139
140
|
# File 'lib/canvas_sync/job_batches/pool.rb', line 137
def job_checked_in(status, options)
active_count = refill_allotment(status.bid)
cleanup_if_empty unless active_count > 0
end
|
#keep_open!(token = SecureRandom.urlsafe_base64(10)) ⇒ Object
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
|
# File 'lib/canvas_sync/job_batches/pool.rb', line 58
def keep_open!(token = SecureRandom.urlsafe_base64(10))
if block_given?
begin
token = keep_open!(token)
yield
ensure
let_close!(token)
end
else
redis.multi do |r|
r.sadd("#{redis_key}-holds", token)
r.expire("#{redis_key}-holds", Batch::BID_EXPIRE_TTL)
end
token
end
end
|
#let_close!(token = :unset) ⇒ Object
75
76
77
78
79
80
81
82
83
84
|
# File 'lib/canvas_sync/job_batches/pool.rb', line 75
def let_close!(token = :unset)
if token == :unset redis.del("#{redis_key}-holds")
redis.hset(redis_key, 'keep_open', 'false')
else
redis.srem("#{redis_key}-holds", token)
end
cleanup_if_empty
end
|
#pending_count(r = redis) ⇒ Object
124
125
126
127
128
129
130
131
132
133
134
135
|
# File 'lib/canvas_sync/job_batches/pool.rb', line 124
def pending_count(r = redis)
jobs_key = "#{redis_key}-jobs"
order = self.order || 'fifo'
case order.to_sym
when :fifo, :lifo
r.llen(jobs_key)
when :random
r.scard(jobs_key)
when :priority
r.zcard(jobs_key)
end
end
|