Class: Qwirk::Batch::ActiveRecord::BatchJob
- Inherits:
-
ActiveRecord::Base
- Object
- ActiveRecord::Base
- Qwirk::Batch::ActiveRecord::BatchJob
show all
- Includes:
- JobStatus
- Defined in:
- lib/qwirk/batch/active_record/batch_job.rb
Constant Summary
Constants included
from JobStatus
JobStatus::ABORTED, JobStatus::CANCELED, JobStatus::FINISHED, JobStatus::INITED, JobStatus::PAUSED, JobStatus::RUNNING, JobStatus::STATUSES
Class Method Summary
collapse
Instance Method Summary
collapse
Constructor Details
#initialize(opts = {}) ⇒ BatchJob
Returns a new instance of BatchJob.
41
42
43
44
|
# File 'lib/qwirk/batch/active_record/batch_job.rb', line 41
def initialize(opts={})
super
@outstanding_array = []
end
|
Class Method Details
.acquire(file, worker_name) ⇒ Object
Acquire this file if it hasn’t already been acquired.
20
21
22
23
24
25
26
|
# File 'lib/qwirk/batch/active_record/batch_job.rb', line 20
def self.acquire(file, worker_name)
return nil if find_by_file_and_worker_name(file, worker_name)
create!(:file => file, :worker_name => worker_name)
rescue ActiveRecord::ActiveRecordError => e
Rails.logger.warn("Assuming race condition (duplicate index) for BatchJob file=#{file} worker=#{worker_name}: #{e.message}")
return nil
end
|
.resume_paused_job(worker_name) ⇒ Object
Acquire and resume a paused job if available
29
30
31
32
33
34
35
36
37
38
39
|
# File 'lib/qwirk/batch/active_record/batch_job.rb', line 29
def self.resume_paused_job(worker_name)
transaction do
job = where(:worker_name => worker_name, status => PAUSED).lock(true).first
return nil unless job
job.outstanding_records.each do |record|
job.start_record(record.file_position)
record.destroy
end
job.update_attribute(:status => RUNNING)
end
end
|
Instance Method Details
#abort ⇒ Object
55
56
57
58
|
# File 'lib/qwirk/batch/active_record/batch_job.rb', line 55
def abort
save_outstanding_array
update_attribute(:status => ABORTED)
end
|
#cancel ⇒ Object
60
61
62
63
|
# File 'lib/qwirk/batch/active_record/batch_job.rb', line 60
def cancel
save_outstanding_array
update_attribute(:status => CANCELED)
end
|
#fail_record(file_position, message) ⇒ Object
78
79
80
81
|
# File 'lib/qwirk/batch/active_record/batch_job.rb', line 78
def fail_record(file_position, message)
@outstanding_array.delete(file_position)
failed_records.create!(:file_position => file_position, :message => message)
end
|
#failed_hash ⇒ Object
94
95
96
97
98
99
100
|
# File 'lib/qwirk/batch/active_record/batch_job.rb', line 94
def failed_hash
hash = {}
failed_records.each do |failed_record|
hash[failed_record.file_position] = failed_record.message
end
return hash
end
|
#finish ⇒ Object
65
66
67
|
# File 'lib/qwirk/batch/active_record/batch_job.rb', line 65
def finish
update_attribute(:status => FINISHED)
end
|
#finish_record(file_position) ⇒ Object
73
74
75
76
|
# File 'lib/qwirk/batch/active_record/batch_job.rb', line 73
def finish_record(file_position)
@outstanding_array.delete(file_position)
update_attribute(:finished_count => finished_count + 1)
end
|
#outstanding_array ⇒ Object
90
91
92
|
# File 'lib/qwirk/batch/active_record/batch_job.rb', line 90
def outstanding_array
@outstanding_array
end
|
#pause ⇒ Object
50
51
52
53
|
# File 'lib/qwirk/batch/active_record/batch_job.rb', line 50
def pause
save_outstanding_array
update_attribute(:status => STOPPED)
end
|
#retry_failed_record ⇒ Object
83
84
85
86
87
88
|
# File 'lib/qwirk/batch/active_record/batch_job.rb', line 83
def retry_failed_record
failed_record = failed_records.first
return nil unless failed_record
failed_record.destroy
return failed_record.file_position
end
|
#run(total_count) ⇒ Object
46
47
48
|
# File 'lib/qwirk/batch/active_record/batch_job.rb', line 46
def run(total_count)
update_attributes(:status => RUNNING, :total_count => total_count)
end
|
#start_record(file_position) ⇒ Object
69
70
71
|
# File 'lib/qwirk/batch/active_record/batch_job.rb', line 69
def start_record(file_position)
@outstanding_array << file_position
end
|