Class: Resque::Plugins::Stages::StagedGroup

Inherits:
Object
  • Object
show all
Includes:
Comparable, RedisAccess
Defined in:
lib/resque/plugins/stages/staged_group.rb

Overview

This class represents the toplevel grouping of a set of staged jobs.

The group defines individual numbered stages (starting with 0) and intiates subsequent stages as the current stage completes.

There are methods on the group to initiate groups and add jobs to individual stages or get/create new stages

Constant Summary

Constants included from RedisAccess

RedisAccess::NAME_SPACE

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from RedisAccess

#redis

Constructor Details

#initialize(group_id, description: "") ⇒ StagedGroup

Returns a new instance of StagedGroup.



19
20
21
22
23
24
25
26
# File 'lib/resque/plugins/stages/staged_group.rb', line 19

def initialize(group_id, description: "")
  @group_id = group_id

  Resque::Plugins::Stages::StagedGroupList.new.add_group(self)

  redis.hsetnx(group_info_key, "created_at", Time.now)
  self.description = description if description.present?
end

Instance Attribute Details

#group_idObject (readonly)

Returns the value of attribute group_id.



17
18
19
# File 'lib/resque/plugins/stages/staged_group.rb', line 17

def group_id
  @group_id
end

Class Method Details

.within_a_grouping(description = nil) {|group| ... } ⇒ Object

Yields:

  • (group)


35
36
37
38
39
40
41
# File 'lib/resque/plugins/stages/staged_group.rb', line 35

def within_a_grouping(description = nil)
  group = Resque::Plugins::Stages::StagedGroup.new(SecureRandom.uuid, description: description)

  yield group

  group.initiate
end

Instance Method Details

#<=>(other) ⇒ Object



28
29
30
31
32
# File 'lib/resque/plugins/stages/staged_group.rb', line 28

def <=>(other)
  return nil unless other.is_a?(Resque::Plugins::Stages::StagedGroup)

  group_id <=> other.group_id
end

#add_stage(staged_group_stage) ⇒ Object



78
79
80
# File 'lib/resque/plugins/stages/staged_group.rb', line 78

def add_stage(staged_group_stage)
  redis.rpush group_key, staged_group_stage.group_stage_id
end

#blank?Boolean

Returns:

  • (Boolean)


131
132
133
# File 'lib/resque/plugins/stages/staged_group.rb', line 131

def blank?
  !redis.exists(group_key) && !redis.exists(group_info_key)
end

#created_atObject



58
59
60
# File 'lib/resque/plugins/stages/staged_group.rb', line 58

def created_at
  @created_at ||= redis.hget(group_info_key, "created_at").presence.to_time || Time.now
end

#current_stageObject



67
68
69
# File 'lib/resque/plugins/stages/staged_group.rb', line 67

def current_stage
  next_stage || new_stage
end

#deleteObject



118
119
120
121
122
123
124
125
# File 'lib/resque/plugins/stages/staged_group.rb', line 118

def delete
  stages.each_value(&:delete)

  Resque::Plugins::Stages::StagedGroupList.new.remove_group(self)

  redis.del group_key
  redis.del group_info_key
end

#descriptionObject



54
55
56
# File 'lib/resque/plugins/stages/staged_group.rb', line 54

def description
  @description ||= redis.hget(group_info_key, "description").presence || group_id
end

#description=(value) ⇒ Object



62
63
64
65
# File 'lib/resque/plugins/stages/staged_group.rb', line 62

def description=(value)
  @description = value.presence
  redis.hset(group_info_key, "description", description)
end

#initiateObject



44
45
46
47
48
49
50
51
52
# File 'lib/resque/plugins/stages/staged_group.rb', line 44

def initiate
  stage = next_stage

  if stage
    stage.initiate
  else
    delete
  end
end

#last_stageObject



71
72
73
74
75
76
# File 'lib/resque/plugins/stages/staged_group.rb', line 71

def last_stage
  group_stages = stages
  last_key     = group_stages.keys.max

  group_stages[last_key]
end

#num_stagesObject



86
87
88
# File 'lib/resque/plugins/stages/staged_group.rb', line 86

def num_stages
  redis.llen group_key
end

#paged_stages(page_num = 1, page_size = nil) ⇒ Object



102
103
104
105
106
107
108
109
110
# File 'lib/resque/plugins/stages/staged_group.rb', line 102

def paged_stages(page_num = 1, page_size = nil)
  page_size ||= 20
  page_size = page_size.to_i
  page_size = 20 if page_size < 1
  start     = (page_num - 1) * page_size
  start     = 0 if start >= num_stages || start.negative?

  stages.values[start..start + page_size - 1]
end

#remove_stage(staged_group_stage) ⇒ Object



82
83
84
# File 'lib/resque/plugins/stages/staged_group.rb', line 82

def remove_stage(staged_group_stage)
  redis.lrem(group_key, 0, staged_group_stage.group_stage_id)
end

#stage(stage_number) ⇒ Object



112
113
114
115
116
# File 'lib/resque/plugins/stages/staged_group.rb', line 112

def stage(stage_number)
  found_stage = stages[stage_number]

  found_stage || create_stage(stage_number)
end

#stage_completedObject



127
128
129
# File 'lib/resque/plugins/stages/staged_group.rb', line 127

def stage_completed
  initiate
end

#stagesObject



90
91
92
93
94
95
96
97
98
99
100
# File 'lib/resque/plugins/stages/staged_group.rb', line 90

def stages
  all_stages = redis.lrange(group_key, 0, -1).map { |stage_id| Resque::Plugins::Stages::StagedGroupStage.new(stage_id) }

  all_stages.each_with_object({}) do |stage, hash|
    num = stage.number
    num += 1 while hash.key?(num)

    hash[num]    = stage
    stage.number = num if stage.number != num
  end
end

#verify_stage(stage) ⇒ Object



135
136
137
138
139
140
141
# File 'lib/resque/plugins/stages/staged_group.rb', line 135

def verify_stage(stage)
  ids = redis.lrange(group_key, 0, -1)

  return if ids.include?(stage.group_stage_id)

  redis.lpush(group_key, stage.group_stage_id)
end