Class: Belated::Queue

Inherits:
Object
  • Object
show all
Includes:
Logging
Defined in:
lib/belated/queue.rb

Overview

Job queues that Belated uses. queue is the jobs that are currenly waiting for a worker to start working on them. future_jobs is a SortedSet of jobs that are going to be added to queue at some point in the future.

Constant Summary collapse

FILE_NAME =
'belated_dump'

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Logging

#error, #log, #logger, #logger=, #warn

Constructor Details

#initialize(queue: Thread::Queue.new, future_jobs: SortedSet.new) ⇒ Queue

Returns a new instance of Queue.



19
20
21
22
23
24
# File 'lib/belated/queue.rb', line 19

def initialize(queue: Thread::Queue.new, future_jobs: SortedSet.new)
  @queue = queue
  @mutex = Mutex.new
  self.future_jobs = future_jobs
  self.future_jobs_db = PStore.new("future_jobs_#{Belated.env}.pstore", true) # pass true for thread safety
end

Instance Attribute Details

#future_jobsObject

Returns the value of attribute future_jobs.



15
16
17
# File 'lib/belated/queue.rb', line 15

def future_jobs
  @future_jobs
end

#future_jobs_dbObject

Returns the value of attribute future_jobs_db.



15
16
17
# File 'lib/belated/queue.rb', line 15

def future_jobs_db
  @future_jobs_db
end

Instance Method Details

#clearObject



59
60
61
62
# File 'lib/belated/queue.rb', line 59

def clear
  @queue.clear
  self.future_jobs = []
end

#connected?Boolean

Returns:

  • (Boolean)


97
98
99
# File 'lib/belated/queue.rb', line 97

def connected?
  true
end

#delete_job(job) ⇒ Object



110
111
112
113
114
115
# File 'lib/belated/queue.rb', line 110

def delete_job(job)
  log "Deleting #{future_jobs.delete(job)} from future jobs"
  future_jobs_db.transaction do
    future_jobs_db.delete(job.id)
  end
end

#empty?Boolean

Returns:

  • (Boolean)


64
65
66
# File 'lib/belated/queue.rb', line 64

def empty?
  @queue.empty?
end

#enqueue_future_jobsObject



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/belated/queue.rb', line 26

def enqueue_future_jobs
  loop do
    job = future_jobs.min
    if job.nil?
      sleep Belated.heartbeat
      next
    end
    if job.at <= Time.now.to_f
      delete_job(job)
      push(job)
    end
  rescue DRb::DRbConnError
    error 'DRb connection error!!!!!!'
    log stats
  end
end

#find(job_id) ⇒ Object



101
102
103
104
105
106
107
108
# File 'lib/belated/queue.rb', line 101

def find(job_id)
  job = nil
  future_jobs_db.transaction(true) do
    job = future_jobs_db[job_id]
  end
  job = future_jobs.find { |j| j.id == job_id } if job.nil?
  job
end

#lengthObject



68
69
70
# File 'lib/belated/queue.rb', line 68

def length
  @queue.length
end

#load_jobsObject



72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/belated/queue.rb', line 72

def load_jobs
  future_jobs_db.transaction(true) do
    future_jobs_db.roots.each do |id|
      future_jobs << future_jobs_db[id]
    end
  end
  return unless File.exist?(FILE_NAME)

  jobs = YAML.load(File.binread(FILE_NAME))
  jobs.each do |job|
    @queue.push(job)
  end
  File.delete(FILE_NAME)
end

#popObject



55
56
57
# File 'lib/belated/queue.rb', line 55

def pop
  @queue.pop
end

#push(job) ⇒ Object



43
44
45
46
47
48
49
50
51
52
53
# File 'lib/belated/queue.rb', line 43

def push(job)
  if job.is_a?(Symbol) || job.at.nil? ||
     job.at <= Time.now.to_f
    @queue.push(job)
  else
    @mutex.synchronize do
      @future_jobs << job
      insert_into_future_jobs_db(job) unless job.proc_klass
    end
  end
end

#save_jobsObject



87
88
89
90
91
92
93
94
95
# File 'lib/belated/queue.rb', line 87

def save_jobs
  class_array = []
  @queue.length.times do |_i|
    unless proc_or_shutdown?(klass = @queue.pop)
      class_array << klass
    end
  end
  pp File.open(FILE_NAME, 'wb') { |f| f.write(YAML.dump(class_array)) }
end