Class: CanvasSync::JobBatches::Pool
- Inherits:
-
Object
- Object
- CanvasSync::JobBatches::Pool
show all
- Includes:
- RedisModel
- Defined in:
- lib/canvas_sync/job_batches/pool.rb
Constant Summary
collapse
- POOL_REFILL =
RedisScript.new(Pathname.new(__FILE__) + "../pool_refill.lua")
Instance Attribute Summary collapse
Class Method Summary
collapse
Instance Method Summary
collapse
Methods included from RedisModel
#persist_bid_attr, #read_bid_attr
Constructor Details
#initialize(pooolid = nil, **kwargs) ⇒ Pool
Returns a new instance of Pool.
17
18
19
20
21
22
23
24
25
|
# File 'lib/canvas_sync/job_batches/pool.rb', line 17
def initialize(pooolid = nil, **kwargs)
if pooolid
@existing = true
@pid = pooolid
else
@pid = SecureRandom.urlsafe_base64(10)
initialize_new(**kwargs)
end
end
|
Instance Attribute Details
#pid ⇒ Object
Returns the value of attribute pid.
8
9
10
|
# File 'lib/canvas_sync/job_batches/pool.rb', line 8
def pid
@pid
end
|
Class Method Details
.cleanup_redis_index! ⇒ Object
Administrative/console method to cleanup expired pools from the WebUI
138
139
140
141
142
143
|
# File 'lib/canvas_sync/job_batches/pool.rb', line 138
def self.cleanup_redis_index!
suffixes = ["", "-active", "-jobs"]
r.zrangebyscore("pools", "0", Batch::BID_EXPIRE_TTL.seconds.ago.to_i).each do |pid|
r.zrem("pools", pid) if Batch.cleanup_redis_index_for("POOLID-#{pid}", suffixes)
end
end
|
.from_pid(pid) ⇒ Object
27
28
29
30
|
# File 'lib/canvas_sync/job_batches/pool.rb', line 27
def self.from_pid(pid)
raise "PID must be given" unless pid.present?
new(pid)
end
|
.job_checked_in(status, options) ⇒ Object
132
133
134
135
|
# File 'lib/canvas_sync/job_batches/pool.rb', line 132
def self.job_checked_in(status, options)
pid = options['pool_id']
from_pid(pid).job_checked_in(status, options)
end
|
Instance Method Details
#<<(job_desc) ⇒ Object
32
33
34
|
# File 'lib/canvas_sync/job_batches/pool.rb', line 32
def <<(job_desc)
add_job(job_desc)
end
|
#active_count(r = redis) ⇒ Object
106
107
108
|
# File 'lib/canvas_sync/job_batches/pool.rb', line 106
def active_count(r = redis)
r.hlen("#{redis_key}-active") + r.hincrby(redis_key, "_active_count", 0)
end
|
#active_jobs(r = redis) ⇒ Object
110
111
112
|
# File 'lib/canvas_sync/job_batches/pool.rb', line 110
def active_jobs(r = redis)
r.hvals("#{redis_key}-active").map {|desc| JSON.parse(desc)[0] }
end
|
#add_job(job_desc) ⇒ Object
36
37
38
|
# File 'lib/canvas_sync/job_batches/pool.rb', line 36
def add_job(job_desc)
add_jobs([job_desc])
end
|
#add_jobs(job_descs, skip_refill: false) ⇒ Object
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
|
# File 'lib/canvas_sync/job_batches/pool.rb', line 40
def add_jobs(job_descs, skip_refill: false)
job_descs.each do |job_desc|
wrapper = Batch.new
wrapper.description = "Pool Job Wrapper (PID: #{pid})"
checkin_event = (on_failed_job == :wait) ? :success : :complete
wrapper.on(checkin_event, "#{self.class.to_s}.job_checked_in", pool_id: pid)
wrapper.jobs {}
job_desc = job_desc.symbolize_keys
job_desc = job_desc.merge!(
job: job_desc[:job].to_s,
pool_wrapper_batch: wrapper.bid,
)
push_job_to_pool(job_desc)
end
refill_allotment unless skip_refill
end
|
#cleanup_if_empty ⇒ Object
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
|
# File 'lib/canvas_sync/job_batches/pool.rb', line 88
def cleanup_if_empty
self.order
activec, pactivec, pendingc, clean_when_empty, keep_open = redis.multi do |r|
r.hlen("#{redis_key}-active")
r.hget(redis_key, "_active_count")
pending_count(r)
r.hget(redis_key, 'clean_when_empty')
r.hget(redis_key, 'keep_open')
end
return if keep_open == 'true' || clean_when_empty == 'false'
if activec <= 0 && (pactivec.try(:to_i) || 0) <= 0 && pendingc <= 0
cleanup_redis
end
end
|
#cleanup_redis ⇒ Object
77
78
79
80
81
82
83
84
85
86
|
# File 'lib/canvas_sync/job_batches/pool.rb', line 77
def cleanup_redis
Batch.logger.debug {"Cleaning redis of pool #{pid}"}
redis do |r|
r.zrem("pools", pid)
r.unlink(
"#{redis_key}",
"#{redis_key}-jobs",
)
end
end
|
#job_checked_in(status, options) ⇒ Object
127
128
129
130
|
# File 'lib/canvas_sync/job_batches/pool.rb', line 127
def job_checked_in(status, options)
active_count = refill_allotment(status.bid)
cleanup_if_empty unless active_count > 0
end
|
#keep_open! ⇒ Object
59
60
61
62
63
64
65
66
67
68
69
70
|
# File 'lib/canvas_sync/job_batches/pool.rb', line 59
def keep_open!
if block_given?
begin
keep_open!
yield
ensure
let_close!
end
else
redis.hset(redis_key, 'keep_open', 'true')
end
end
|
#let_close! ⇒ Object
72
73
74
75
|
# File 'lib/canvas_sync/job_batches/pool.rb', line 72
def let_close!
redis.hset(redis_key, 'keep_open', 'false')
cleanup_if_empty
end
|
#pending_count(r = redis) ⇒ Object
114
115
116
117
118
119
120
121
122
123
124
125
|
# File 'lib/canvas_sync/job_batches/pool.rb', line 114
def pending_count(r = redis)
jobs_key = "#{redis_key}-jobs"
order = self.order || 'fifo'
case order.to_sym
when :fifo, :lifo
r.llen(jobs_key)
when :random
r.scard(jobs_key)
when :priority
r.zcard(jobs_key)
end
end
|