Class: Resque::Plugins::Stages::StagedGroupStage
- Inherits:
-
Object
- Object
- Resque::Plugins::Stages::StagedGroupStage
- Includes:
- Comparable, RedisAccess
- Defined in:
- lib/resque/plugins/stages/staged_group_stage.rb
Overview
rubocop:disable Metrics/ClassLength
Constant Summary
Constants included from RedisAccess
Instance Attribute Summary collapse
-
#group_stage_id ⇒ Object
readonly
Returns the value of attribute group_stage_id.
Instance Method Summary collapse
- #<=>(other) ⇒ Object
- #add_job(staged_group_job) ⇒ Object
- #blank? ⇒ Boolean
- #delete ⇒ Object
- #enqueue(klass, *args) ⇒ Object
- #enqueue_at(timestamp, klass, *args) ⇒ Object
- #enqueue_at_with_queue(queue, timestamp, klass, *args) ⇒ Object
- #enqueue_in(number_of_seconds_from_now, klass, *args) ⇒ Object
- #enqueue_in_with_queue(queue, number_of_seconds_from_now, klass, *args) ⇒ Object
- #enqueue_to(queue, klass, *args) ⇒ Object
-
#initialize(group_stage_id) ⇒ StagedGroupStage
constructor
A new instance of StagedGroupStage.
- #initiate ⇒ Object
- #job_completed ⇒ Object
- #jobs(start = 0, stop = -1)) ⇒ Object
- #jobs_by_status(status) ⇒ Object
- #num_jobs ⇒ Object
- #number ⇒ Object
- #number=(value) ⇒ Object
- #order_param(sort_option, current_sort, current_order) ⇒ Object
- #paginated_jobs(sort_key = :class_name, sort_order = "asc", page_num = 1, queue_page_size = 20) ⇒ Object
- #remove_job(staged_group_job) ⇒ Object
- #staged_group ⇒ Object
- #staged_group=(value) ⇒ Object
- #status ⇒ Object
- #status=(value) ⇒ Object
- #verify ⇒ Object
- #verify_job(job) ⇒ Object
Methods included from RedisAccess
Constructor Details
#initialize(group_stage_id) ⇒ StagedGroupStage
Returns a new instance of StagedGroupStage.
20 21 22 |
# File 'lib/resque/plugins/stages/staged_group_stage.rb', line 20 def initialize(group_stage_id) @group_stage_id = group_stage_id end |
Instance Attribute Details
#group_stage_id ⇒ Object (readonly)
Returns the value of attribute group_stage_id.
18 19 20 |
# File 'lib/resque/plugins/stages/staged_group_stage.rb', line 18 def group_stage_id @group_stage_id end |
Instance Method Details
#<=>(other) ⇒ Object
24 25 26 27 28 |
# File 'lib/resque/plugins/stages/staged_group_stage.rb', line 24 def <=>(other) return nil unless other.is_a?(Resque::Plugins::Stages::StagedGroupStage) group_stage_id <=> other.group_stage_id end |
#add_job(staged_group_job) ⇒ Object
177 178 179 |
# File 'lib/resque/plugins/stages/staged_group_stage.rb', line 177 def add_job(staged_group_job) redis.rpush stage_key, staged_group_job.job_id end |
#blank? ⇒ Boolean
211 212 213 |
# File 'lib/resque/plugins/stages/staged_group_stage.rb', line 211 def blank? !redis.exists(stage_key) && !redis.exists(staged_group_key) end |
#delete ⇒ Object
185 186 187 188 189 190 191 192 |
# File 'lib/resque/plugins/stages/staged_group_stage.rb', line 185 def delete jobs.each(&:delete) staged_group&.remove_stage self redis.del stage_key redis.del staged_group_key end |
#enqueue(klass, *args) ⇒ Object
30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/resque/plugins/stages/staged_group_stage.rb', line 30 def enqueue(klass, *args) job = create_enqueue_job(klass, args) return job if status == :pending self.status = :running if status != :running job.status = :queued Resque.enqueue(*job.enqueue_args) job end |
#enqueue_at(timestamp, klass, *args) ⇒ Object
56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/resque/plugins/stages/staged_group_stage.rb', line 56 def enqueue_at(, klass, *args) job = create_enqueue_job(klass, args) return job if status == :pending self.status = :running if status != :running job.status = :queued Resque.enqueue_at(, *job.enqueue_args) job end |
#enqueue_at_with_queue(queue, timestamp, klass, *args) ⇒ Object
69 70 71 72 73 74 75 76 77 78 79 80 |
# File 'lib/resque/plugins/stages/staged_group_stage.rb', line 69 def enqueue_at_with_queue(queue, , klass, *args) job = create_enqueue_job(klass, args) return job if status == :pending self.status = :running if status != :running job.status = :queued Resque.enqueue_at_with_queue(queue, , *job.enqueue_args) job end |
#enqueue_in(number_of_seconds_from_now, klass, *args) ⇒ Object
82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/resque/plugins/stages/staged_group_stage.rb', line 82 def enqueue_in(number_of_seconds_from_now, klass, *args) job = create_enqueue_job(klass, args) return job if status == :pending self.status = :running if status != :running job.status = :queued Resque.enqueue_in(number_of_seconds_from_now, *job.enqueue_args) job end |
#enqueue_in_with_queue(queue, number_of_seconds_from_now, klass, *args) ⇒ Object
95 96 97 98 99 100 101 102 103 104 105 106 |
# File 'lib/resque/plugins/stages/staged_group_stage.rb', line 95 def enqueue_in_with_queue(queue, number_of_seconds_from_now, klass, *args) job = create_enqueue_job(klass, args) return job if status == :pending self.status = :running if status != :running job.status = :queued Resque.enqueue_in_with_queue(queue, number_of_seconds_from_now, *job.enqueue_args) job end |
#enqueue_to(queue, klass, *args) ⇒ Object
43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/resque/plugins/stages/staged_group_stage.rb', line 43 def enqueue_to(queue, klass, *args) job = create_enqueue_job(klass, args) return job if status == :pending self.status = :running if status != :running job.status = :queued Resque.enqueue_to(queue, *job.enqueue_args) job end |
#initiate ⇒ Object
194 195 196 197 198 199 200 201 202 203 204 205 |
# File 'lib/resque/plugins/stages/staged_group_stage.rb', line 194 def initiate self.status = :running jobs.each do |job| next if job.completed? next if job.queued? job.enqueue_job end job_completed end |
#job_completed ⇒ Object
207 208 209 |
# File 'lib/resque/plugins/stages/staged_group_stage.rb', line 207 def job_completed self.status = :complete if jobs.all?(&:completed?) end |
#jobs(start = 0, stop = -1)) ⇒ Object
140 141 142 |
# File 'lib/resque/plugins/stages/staged_group_stage.rb', line 140 def jobs(start = 0, stop = -1) redis.lrange(stage_key, start, stop).map { |id| Resque::Plugins::Stages::StagedJob.new(id) } end |
#jobs_by_status(status) ⇒ Object
144 145 146 |
# File 'lib/resque/plugins/stages/staged_group_stage.rb', line 144 def jobs_by_status(status) jobs.select { |job| job.status == status } end |
#num_jobs ⇒ Object
173 174 175 |
# File 'lib/resque/plugins/stages/staged_group_stage.rb', line 173 def num_jobs redis.llen(stage_key) end |
#number ⇒ Object
118 119 120 |
# File 'lib/resque/plugins/stages/staged_group_stage.rb', line 118 def number redis.hget(staged_group_key, "number")&.to_i || 1 end |
#number=(value) ⇒ Object
122 123 124 |
# File 'lib/resque/plugins/stages/staged_group_stage.rb', line 122 def number=(value) redis.hset(staged_group_key, "number", value) end |
#order_param(sort_option, current_sort, current_order) ⇒ Object
163 164 165 166 167 168 169 170 171 |
# File 'lib/resque/plugins/stages/staged_group_stage.rb', line 163 def order_param(sort_option, current_sort, current_order) current_order ||= "asc" if sort_option == current_sort current_order == "asc" ? "desc" : "asc" else "asc" end end |
#paginated_jobs(sort_key = :class_name, sort_order = "asc", page_num = 1, queue_page_size = 20) ⇒ Object
148 149 150 151 152 153 154 155 156 157 158 159 160 161 |
# File 'lib/resque/plugins/stages/staged_group_stage.rb', line 148 def paginated_jobs(sort_key = :class_name, sort_order = "asc", page_num = 1, queue_page_size = 20) queue_page_size = queue_page_size.to_i queue_page_size = 20 if queue_page_size < 1 job_list = sorted_jobs(sort_key) page_start = (page_num - 1) * queue_page_size page_start = 0 if page_start > job_list.length || page_start.negative? (sort_order == "desc" ? job_list.reverse : job_list)[page_start..(page_start + queue_page_size - 1)] end |
#remove_job(staged_group_job) ⇒ Object
181 182 183 |
# File 'lib/resque/plugins/stages/staged_group_stage.rb', line 181 def remove_job(staged_group_job) redis.lrem(stage_key, 0, staged_group_job.job_id) end |
#staged_group ⇒ Object
126 127 128 129 130 |
# File 'lib/resque/plugins/stages/staged_group_stage.rb', line 126 def staged_group return nil if staged_group_id.blank? @staged_group ||= Resque::Plugins::Stages::StagedGroup.new(staged_group_id) end |
#staged_group=(value) ⇒ Object
132 133 134 135 136 137 138 |
# File 'lib/resque/plugins/stages/staged_group_stage.rb', line 132 def staged_group=(value) @staged_group = value @staged_group_id = value.group_id value.add_stage(self) redis.hset(staged_group_key, "staged_group_id", value.group_id) end |
#status ⇒ Object
108 109 110 |
# File 'lib/resque/plugins/stages/staged_group_stage.rb', line 108 def status redis.hget(staged_group_key, "status")&.to_sym || :pending end |
#status=(value) ⇒ Object
112 113 114 115 116 |
# File 'lib/resque/plugins/stages/staged_group_stage.rb', line 112 def status=(value) redis.hset(staged_group_key, "status", value.to_s) staged_group&.stage_completed if status == :complete end |
#verify ⇒ Object
215 216 217 218 219 |
# File 'lib/resque/plugins/stages/staged_group_stage.rb', line 215 def verify return build_new_structure if staged_group.blank? staged_group.verify_stage(self) end |
#verify_job(job) ⇒ Object
221 222 223 224 225 226 227 |
# File 'lib/resque/plugins/stages/staged_group_stage.rb', line 221 def verify_job(job) ids = redis.lrange(stage_key, 0, -1) return if ids.include?(job.job_id) redis.lpush(stage_key, job.job_id) end |