Class: Resque::Plugins::Stages::StagedGroupStage

Inherits:
Object
  • Object
show all
Includes:
Comparable, RedisAccess
Defined in:
lib/resque/plugins/stages/staged_group_stage.rb

Overview

rubocop:disable Metrics/ClassLength

Constant Summary

Constants included from RedisAccess

RedisAccess::NAME_SPACE

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from RedisAccess

#redis

Constructor Details

#initialize(group_stage_id) ⇒ StagedGroupStage

Returns a new instance of StagedGroupStage.



20
21
22
# File 'lib/resque/plugins/stages/staged_group_stage.rb', line 20

def initialize(group_stage_id)
  @group_stage_id = group_stage_id
end

Instance Attribute Details

#group_stage_idObject (readonly)

Returns the value of attribute group_stage_id.



18
19
20
# File 'lib/resque/plugins/stages/staged_group_stage.rb', line 18

def group_stage_id
  @group_stage_id
end

Instance Method Details

#<=>(other) ⇒ Object



24
25
26
27
28
# File 'lib/resque/plugins/stages/staged_group_stage.rb', line 24

def <=>(other)
  return nil unless other.is_a?(Resque::Plugins::Stages::StagedGroupStage)

  group_stage_id <=> other.group_stage_id
end

#add_job(staged_group_job) ⇒ Object



177
178
179
# File 'lib/resque/plugins/stages/staged_group_stage.rb', line 177

def add_job(staged_group_job)
  redis.rpush stage_key, staged_group_job.job_id
end

#blank?Boolean

Returns:

  • (Boolean)


211
212
213
# File 'lib/resque/plugins/stages/staged_group_stage.rb', line 211

def blank?
  !redis.exists(stage_key) && !redis.exists(staged_group_key)
end

#deleteObject



185
186
187
188
189
190
191
192
# File 'lib/resque/plugins/stages/staged_group_stage.rb', line 185

def delete
  jobs.each(&:delete)

  staged_group&.remove_stage self

  redis.del stage_key
  redis.del staged_group_key
end

#enqueue(klass, *args) ⇒ Object



30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/resque/plugins/stages/staged_group_stage.rb', line 30

def enqueue(klass, *args)
  job = create_enqueue_job(klass, args)

  return job if status == :pending

  self.status = :running if status != :running

  job.status = :queued
  Resque.enqueue(*job.enqueue_args)

  job
end

#enqueue_at(timestamp, klass, *args) ⇒ Object



56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/resque/plugins/stages/staged_group_stage.rb', line 56

def enqueue_at(timestamp, klass, *args)
  job = create_enqueue_job(klass, args)

  return job if status == :pending

  self.status = :running if status != :running

  job.status = :queued
  Resque.enqueue_at(timestamp, *job.enqueue_args)

  job
end

#enqueue_at_with_queue(queue, timestamp, klass, *args) ⇒ Object



69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/resque/plugins/stages/staged_group_stage.rb', line 69

def enqueue_at_with_queue(queue, timestamp, klass, *args)
  job = create_enqueue_job(klass, args)

  return job if status == :pending

  self.status = :running if status != :running

  job.status = :queued
  Resque.enqueue_at_with_queue(queue, timestamp, *job.enqueue_args)

  job
end

#enqueue_in(number_of_seconds_from_now, klass, *args) ⇒ Object



82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/resque/plugins/stages/staged_group_stage.rb', line 82

def enqueue_in(number_of_seconds_from_now, klass, *args)
  job = create_enqueue_job(klass, args)

  return job if status == :pending

  self.status = :running if status != :running

  job.status = :queued
  Resque.enqueue_in(number_of_seconds_from_now, *job.enqueue_args)

  job
end

#enqueue_in_with_queue(queue, number_of_seconds_from_now, klass, *args) ⇒ Object



95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/resque/plugins/stages/staged_group_stage.rb', line 95

def enqueue_in_with_queue(queue, number_of_seconds_from_now, klass, *args)
  job = create_enqueue_job(klass, args)

  return job if status == :pending

  self.status = :running if status != :running

  job.status = :queued
  Resque.enqueue_in_with_queue(queue, number_of_seconds_from_now, *job.enqueue_args)

  job
end

#enqueue_to(queue, klass, *args) ⇒ Object



43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/resque/plugins/stages/staged_group_stage.rb', line 43

def enqueue_to(queue, klass, *args)
  job = create_enqueue_job(klass, args)

  return job if status == :pending

  self.status = :running if status != :running

  job.status = :queued
  Resque.enqueue_to(queue, *job.enqueue_args)

  job
end

#initiateObject



194
195
196
197
198
199
200
201
202
203
204
205
# File 'lib/resque/plugins/stages/staged_group_stage.rb', line 194

def initiate
  self.status = :running

  jobs.each do |job|
    next if job.completed?
    next if job.queued?

    job.enqueue_job
  end

  job_completed
end

#job_completedObject



207
208
209
# File 'lib/resque/plugins/stages/staged_group_stage.rb', line 207

def job_completed
  self.status = :complete if jobs.all?(&:completed?)
end

#jobs(start = 0, stop = -1)) ⇒ Object



140
141
142
# File 'lib/resque/plugins/stages/staged_group_stage.rb', line 140

def jobs(start = 0, stop = -1)
  redis.lrange(stage_key, start, stop).map { |id| Resque::Plugins::Stages::StagedJob.new(id) }
end

#jobs_by_status(status) ⇒ Object



144
145
146
# File 'lib/resque/plugins/stages/staged_group_stage.rb', line 144

def jobs_by_status(status)
  jobs.select { |job| job.status == status }
end

#num_jobsObject



173
174
175
# File 'lib/resque/plugins/stages/staged_group_stage.rb', line 173

def num_jobs
  redis.llen(stage_key)
end

#numberObject



118
119
120
# File 'lib/resque/plugins/stages/staged_group_stage.rb', line 118

def number
  redis.hget(staged_group_key, "number")&.to_i || 1
end

#number=(value) ⇒ Object



122
123
124
# File 'lib/resque/plugins/stages/staged_group_stage.rb', line 122

def number=(value)
  redis.hset(staged_group_key, "number", value)
end

#order_param(sort_option, current_sort, current_order) ⇒ Object



163
164
165
166
167
168
169
170
171
# File 'lib/resque/plugins/stages/staged_group_stage.rb', line 163

def order_param(sort_option, current_sort, current_order)
  current_order ||= "asc"

  if sort_option == current_sort
    current_order == "asc" ? "desc" : "asc"
  else
    "asc"
  end
end

#paginated_jobs(sort_key = :class_name, sort_order = "asc", page_num = 1, queue_page_size = 20) ⇒ Object



148
149
150
151
152
153
154
155
156
157
158
159
160
161
# File 'lib/resque/plugins/stages/staged_group_stage.rb', line 148

def paginated_jobs(sort_key = :class_name,
                   sort_order = "asc",
                   page_num = 1,
                   queue_page_size = 20)
  queue_page_size = queue_page_size.to_i
  queue_page_size = 20 if queue_page_size < 1

  job_list = sorted_jobs(sort_key)

  page_start = (page_num - 1) * queue_page_size
  page_start = 0 if page_start > job_list.length || page_start.negative?

  (sort_order == "desc" ? job_list.reverse : job_list)[page_start..(page_start + queue_page_size - 1)]
end

#remove_job(staged_group_job) ⇒ Object



181
182
183
# File 'lib/resque/plugins/stages/staged_group_stage.rb', line 181

def remove_job(staged_group_job)
  redis.lrem(stage_key, 0, staged_group_job.job_id)
end

#staged_groupObject



126
127
128
129
130
# File 'lib/resque/plugins/stages/staged_group_stage.rb', line 126

def staged_group
  return nil if staged_group_id.blank?

  @staged_group ||= Resque::Plugins::Stages::StagedGroup.new(staged_group_id)
end

#staged_group=(value) ⇒ Object



132
133
134
135
136
137
138
# File 'lib/resque/plugins/stages/staged_group_stage.rb', line 132

def staged_group=(value)
  @staged_group    = value
  @staged_group_id = value.group_id

  value.add_stage(self)
  redis.hset(staged_group_key, "staged_group_id", value.group_id)
end

#statusObject



108
109
110
# File 'lib/resque/plugins/stages/staged_group_stage.rb', line 108

def status
  redis.hget(staged_group_key, "status")&.to_sym || :pending
end

#status=(value) ⇒ Object



112
113
114
115
116
# File 'lib/resque/plugins/stages/staged_group_stage.rb', line 112

def status=(value)
  redis.hset(staged_group_key, "status", value.to_s)

  staged_group&.stage_completed if status == :complete
end

#verifyObject



215
216
217
218
219
# File 'lib/resque/plugins/stages/staged_group_stage.rb', line 215

def verify
  return build_new_structure if staged_group.blank?

  staged_group.verify_stage(self)
end

#verify_job(job) ⇒ Object



221
222
223
224
225
226
227
# File 'lib/resque/plugins/stages/staged_group_stage.rb', line 221

def verify_job(job)
  ids = redis.lrange(stage_key, 0, -1)

  return if ids.include?(job.job_id)

  redis.lpush(stage_key, job.job_id)
end