Class: Belated::Queue
- Inherits:
-
Object
show all
- Includes:
- Logging
- Defined in:
- lib/belated/queue.rb
Overview
Job queues that Belated uses. queue is the jobs that are currenly waiting for a worker to start working on them. future_jobs is a SortedSet of jobs that are going to be added to queue at some point in the future.
Constant Summary
collapse
- FILE_NAME =
'belated_dump'
Instance Attribute Summary collapse
Instance Method Summary
collapse
Methods included from Logging
#error, #log, #logger, #logger=, #warn
Constructor Details
#initialize(queue: Thread::Queue.new, future_jobs: SortedSet.new) ⇒ Queue
Returns a new instance of Queue.
19
20
21
22
23
24
|
# File 'lib/belated/queue.rb', line 19
def initialize(queue: Thread::Queue.new, future_jobs: SortedSet.new)
@queue = queue
@mutex = Mutex.new
self.future_jobs = future_jobs
self.future_jobs_db = PStore.new("future_jobs_#{Belated.env}.pstore", true) end
|
Instance Attribute Details
#future_jobs ⇒ Object
Returns the value of attribute future_jobs.
15
16
17
|
# File 'lib/belated/queue.rb', line 15
def future_jobs
@future_jobs
end
|
#future_jobs_db ⇒ Object
Returns the value of attribute future_jobs_db.
15
16
17
|
# File 'lib/belated/queue.rb', line 15
def future_jobs_db
@future_jobs_db
end
|
Instance Method Details
#clear ⇒ Object
59
60
61
62
|
# File 'lib/belated/queue.rb', line 59
def clear
@queue.clear
self.future_jobs = []
end
|
#connected? ⇒ Boolean
97
98
99
|
# File 'lib/belated/queue.rb', line 97
def connected?
true
end
|
#delete_job(job) ⇒ Object
110
111
112
113
114
115
|
# File 'lib/belated/queue.rb', line 110
def delete_job(job)
log "Deleting #{future_jobs.delete(job)} from future jobs"
future_jobs_db.transaction do
future_jobs_db.delete(job.id)
end
end
|
#empty? ⇒ Boolean
64
65
66
|
# File 'lib/belated/queue.rb', line 64
def empty?
@queue.empty?
end
|
#enqueue_future_jobs ⇒ Object
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
# File 'lib/belated/queue.rb', line 26
def enqueue_future_jobs
loop do
job = future_jobs.min
if job.nil?
sleep Belated.heartbeat
next
end
if job.at <= Time.now.to_f
delete_job(job)
push(job)
end
rescue DRb::DRbConnError
error 'DRb connection error!!!!!!'
log stats
end
end
|
#find(job_id) ⇒ Object
101
102
103
104
105
106
107
108
|
# File 'lib/belated/queue.rb', line 101
def find(job_id)
job = nil
future_jobs_db.transaction(true) do
job = future_jobs_db[job_id]
end
job = future_jobs.find { |j| j.id == job_id } if job.nil?
job
end
|
#length ⇒ Object
68
69
70
|
# File 'lib/belated/queue.rb', line 68
def length
@queue.length
end
|
#load_jobs ⇒ Object
72
73
74
75
76
77
78
79
80
81
82
83
84
85
|
# File 'lib/belated/queue.rb', line 72
def load_jobs
future_jobs_db.transaction(true) do
future_jobs_db.roots.each do |id|
future_jobs << future_jobs_db[id]
end
end
return unless File.exist?(FILE_NAME)
jobs = YAML.load(File.binread(FILE_NAME))
jobs.each do |job|
@queue.push(job)
end
File.delete(FILE_NAME)
end
|
#pop ⇒ Object
55
56
57
|
# File 'lib/belated/queue.rb', line 55
def pop
@queue.pop
end
|
#push(job) ⇒ Object
43
44
45
46
47
48
49
50
51
52
53
|
# File 'lib/belated/queue.rb', line 43
def push(job)
if job.is_a?(Symbol) || job.at.nil? ||
job.at <= Time.now.to_f
@queue.push(job)
else
@mutex.synchronize do
@future_jobs << job
insert_into_future_jobs_db(job) unless job.proc_klass
end
end
end
|
#save_jobs ⇒ Object
87
88
89
90
91
92
93
94
95
|
# File 'lib/belated/queue.rb', line 87
def save_jobs
class_array = []
@queue.length.times do |_i|
unless proc_or_shutdown?(klass = @queue.pop)
class_array << klass
end
end
pp File.open(FILE_NAME, 'wb') { |f| f.write(YAML.dump(class_array)) }
end
|