Class: Scheduler::Manager

Inherits:
Object
  • Object
show all
Defined in:
lib/scheduler/manager.rb

Defined Under Namespace

Classes: Runner

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(redis = nil, options = nil) ⇒ Manager

Returns a new instance of Manager.



120
121
122
123
124
125
126
127
128
129
130
# File 'lib/scheduler/manager.rb', line 120

def initialize(redis = nil, options=nil)
  @redis = $redis || redis
  @random_ratio = 0.1
  unless options && options[:skip_runner]
    @runner = Runner.new(self)
    self.class.current = self
  end

  @hostname = options && options[:hostname]
  @manager_id = SecureRandom.hex
end

Instance Attribute Details

#random_ratioObject

Returns the value of attribute random_ratio.



11
12
13
# File 'lib/scheduler/manager.rb', line 11

def random_ratio
  @random_ratio
end

#redisObject

Returns the value of attribute redis.



11
12
13
# File 'lib/scheduler/manager.rb', line 11

def redis
  @redis
end

Class Method Details

.currentObject



132
133
134
# File 'lib/scheduler/manager.rb', line 132

def self.current
  @current
end

.current=(manager) ⇒ Object



136
137
138
# File 'lib/scheduler/manager.rb', line 136

def self.current=(manager)
  @current = manager
end

.discover_schedulesObject



255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
# File 'lib/scheduler/manager.rb', line 255

def self.discover_schedules
  # hack for developemnt reloader is crazytown
  # multiple classes with same name can be in
  # object space
  unique = Set.new
  schedules = []
  ObjectSpace.each_object(Scheduler::Schedule) do |schedule|
    if schedule.scheduled?
      next if unique.include?(schedule.to_s)
      schedules << schedule
      unique << schedule.to_s
    end
  end
  schedules
end

.lock_keyObject



283
284
285
# File 'lib/scheduler/manager.rb', line 283

def self.lock_key
  "_scheduler_lock_"
end

.queue_key(hostname = nil) ⇒ Object



287
288
289
290
291
292
293
# File 'lib/scheduler/manager.rb', line 287

def self.queue_key(hostname=nil)
  if hostname
    "_scheduler_queue_#{hostname}_"
  else
    "_scheduler_queue_"
  end
end

.schedule_key(klass, hostname = nil) ⇒ Object



295
296
297
298
299
300
301
# File 'lib/scheduler/manager.rb', line 295

def self.schedule_key(klass,hostname=nil)
  if hostname
    "_scheduler_#{klass}_#{hostname}"
  else
    "_scheduler_#{klass}"
  end
end

.seqObject



272
273
274
275
276
277
# File 'lib/scheduler/manager.rb', line 272

def self.seq
  @mutex.synchronize do
    @i ||= 0
    @i += 1
  end
end

.without_runner(redis = nil) ⇒ Object



116
117
118
# File 'lib/scheduler/manager.rb', line 116

def self.without_runner(redis=nil)
  self.new(redis, skip_runner: true)
end

Instance Method Details

#blocking_tickObject



230
231
232
233
# File 'lib/scheduler/manager.rb', line 230

def blocking_tick
  tick
  @runner.wait_till_done
end

#ensure_schedule!(klass) ⇒ Object



152
153
154
155
156
157
# File 'lib/scheduler/manager.rb', line 152

def ensure_schedule!(klass)
  lock do
    schedule_info(klass).schedule!
  end

end

#get_klass(name) ⇒ Object



187
188
189
190
191
# File 'lib/scheduler/manager.rb', line 187

def get_klass(name)
  name.constantize
rescue NameError
  nil
end

#hostnameObject



140
141
142
# File 'lib/scheduler/manager.rb', line 140

def hostname
  @hostname ||= `hostname`.strip
end

#identity_keyObject



279
280
281
# File 'lib/scheduler/manager.rb', line 279

def identity_key
  @identity_key ||= "_scheduler_#{hostname}:#{Process.pid}:#{self.class.seq}:#{SecureRandom.hex}"
end

#keep_aliveObject



244
245
246
# File 'lib/scheduler/manager.rb', line 244

def keep_alive
  redis.setex identity_key, keep_alive_duration, ""
end

#keep_alive_durationObject



240
241
242
# File 'lib/scheduler/manager.rb', line 240

def keep_alive_duration
  60
end

#lockObject



248
249
250
251
252
# File 'lib/scheduler/manager.rb', line 248

def lock
  DistributedMutex.new(Manager.lock_key).synchronize do
    yield
  end
end

#next_run(klass) ⇒ Object



148
149
150
# File 'lib/scheduler/manager.rb', line 148

def next_run(klass)
  schedule_info(klass).next_run
end

#remove(klass) ⇒ Object



159
160
161
162
163
# File 'lib/scheduler/manager.rb', line 159

def remove(klass)
  lock do
    schedule_info(klass).del!
  end
end

#reschedule_orphans!Object



165
166
167
168
169
170
# File 'lib/scheduler/manager.rb', line 165

def reschedule_orphans!
  lock do
    reschedule_orphans_on!
    reschedule_orphans_on!(hostname)
  end
end

#reschedule_orphans_on!(hostname = nil) ⇒ Object



172
173
174
175
176
177
178
179
180
181
182
183
184
185
# File 'lib/scheduler/manager.rb', line 172

def reschedule_orphans_on!(hostname=nil)
  redis.zrange(Manager.queue_key(hostname), 0, -1).each do |key|
    klass = get_klass(key)
    next unless klass
    info = schedule_info(klass)

    if ['QUEUED', 'RUNNING'].include?(info.prev_result) &&
      (info.current_owner.blank? || !redis.get(info.current_owner))
      info.prev_result = 'ORPHAN'
      info.next_run = Time.now.to_i
      info.write!
    end
  end
end

#schedule_info(klass) ⇒ Object



144
145
146
# File 'lib/scheduler/manager.rb', line 144

def schedule_info(klass)
  ScheduleInfo.new(klass, self)
end

#schedule_next_job(hostname = nil) ⇒ Object



200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
# File 'lib/scheduler/manager.rb', line 200

def schedule_next_job(hostname=nil)
  (key, due), _ = redis.zrange Manager.queue_key(hostname), 0, 0, withscores: true

  return unless key
  if due.to_i <= Time.now.to_i
    klass = get_klass(key)
    unless klass
      # corrupt key, nuke it (renamed job or something)
      redis.zrem Manager.queue_key(hostname), key
      return
    end

    unless klass.respond_to?(:daily) &&
           klass.respond_to?(:every)
      # job klass exists but no longer extends from the base
      redis.zrem Manager.queue_key(hostname), key
      return
    end

    info = schedule_info(klass)
    info.prev_run = Time.now.to_i
    info.prev_result = "QUEUED"
    info.prev_duration = -1
    info.next_run = nil
    info.current_owner = identity_key
    info.schedule!
    @runner.enq(klass)
  end
end

#stop!Object



235
236
237
238
# File 'lib/scheduler/manager.rb', line 235

def stop!
  @runner.stop!
  self.class.current = nil
end

#tickObject



193
194
195
196
197
198
# File 'lib/scheduler/manager.rb', line 193

def tick
  lock do
    schedule_next_job
    schedule_next_job(hostname)
  end
end