Class: When::Do

Inherits:
Object
  • Object
show all
Defined in:
lib/when-do/do.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opts = {}) ⇒ Do

Returns a new instance of Do.



10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# File 'lib/when-do/do.rb', line 10

def initialize(opts={})
  @opts   = opts
  @logger = init_logger(opts[:log_path], opts[:log_level])

  Process.daemon(true) if opts.has_key?(:daemonize)

  @pid_file_path = opts[:pid_file_path]
  if pid_file_path
    File.open(pid_file_path, 'w') { |f| f.write(Process.pid) }
  end

  redis_opts         = opts[:redis_opts]        || {}
  @redis             = Redis.new(redis_opts)

  @schedule_key      = opts[:schedule_key]      || 'when:schedules'
  @worker_queue_key  = opts[:worker_queue_key]  || 'when:queue:default'
  @delayed_queue_key = opts[:delayed_queue_key] || 'when:delayed'
end

Instance Attribute Details

#delayed_queue_keyObject (readonly)

Returns the value of attribute delayed_queue_key.



8
9
10
# File 'lib/when-do/do.rb', line 8

def delayed_queue_key
  @delayed_queue_key
end

#loggerObject (readonly)

Returns the value of attribute logger.



8
9
10
# File 'lib/when-do/do.rb', line 8

def logger
  @logger
end

#optsObject (readonly)

Returns the value of attribute opts.



8
9
10
# File 'lib/when-do/do.rb', line 8

def opts
  @opts
end

#pid_file_pathObject (readonly)

Returns the value of attribute pid_file_path.



8
9
10
# File 'lib/when-do/do.rb', line 8

def pid_file_path
  @pid_file_path
end

#redisObject (readonly)

Returns the value of attribute redis.



8
9
10
# File 'lib/when-do/do.rb', line 8

def redis
  @redis
end

#schedule_keyObject (readonly)

Returns the value of attribute schedule_key.



8
9
10
# File 'lib/when-do/do.rb', line 8

def schedule_key
  @schedule_key
end

#worker_queue_keyObject (readonly)

Returns the value of attribute worker_queue_key.



8
9
10
# File 'lib/when-do/do.rb', line 8

def worker_queue_key
  @worker_queue_key
end

Instance Method Details

#analyze(started_at) ⇒ Object



70
71
72
73
74
75
76
77
78
79
# File 'lib/when-do/do.rb', line 70

def analyze(started_at)
  if running?(started_at)
    logger.info('Another process is already analyzing.')
  else
    analyze_dst(started_at) if dst_forward?(started_at)
    logger.debug { "Analyzing #{started_at}." }
    queue_scheduled(started_at)
    queue_delayed(started_at)
  end
end

#analyze_dst(started_at) ⇒ Object



99
100
101
102
103
104
105
# File 'lib/when-do/do.rb', line 99

def analyze_dst(started_at)
  logger.info { "DST forward shift detected. Triggering analysis for #{started_at.hour - 1}:00 through #{started_at.hour - 1}:59"}
  skipped_time = Time.new(started_at.year, started_at.month, started_at.day, started_at.hour - 1, 0, 0, started_at.utc_offset - 3600)
  (0..59).each do |min|
    analyze(skipped_time + min * 60)
  end
end

#analyze_in_child_processObject



55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/when-do/do.rb', line 55

def analyze_in_child_process
  if pid = fork
    Thread.new {
      pid, status = Process.wait2(pid)
      if status.exitstatus != 0
        raise "Child (pid: #{pid} exited with non-zero status. Check logs."
      end
    }.abort_on_exception = true
  else
    ['HUP', 'INT', 'TERM', 'QUIT'].each { |sig| Signal.trap(sig) { }}
    analyze(Time.now)
    exit
  end
end

#build_day_key(started_at) ⇒ Object



107
108
109
# File 'lib/when-do/do.rb', line 107

def build_day_key(started_at)
  "#{schedule_key}:#{started_at.to_s.split(' ')[0]}"
end

#build_min_key(started_at) ⇒ Object



111
112
113
# File 'lib/when-do/do.rb', line 111

def build_min_key(started_at)
  "#{started_at.hour}:#{started_at.min}"
end

#dst_forward?(started_at) ⇒ Boolean

Returns:

  • (Boolean)


95
96
97
# File 'lib/when-do/do.rb', line 95

def dst_forward?(started_at)
  started_at.hour - (started_at - 60).hour == 2
end

#enqueue(jobs) ⇒ Object



144
145
146
147
148
149
150
151
152
153
# File 'lib/when-do/do.rb', line 144

def enqueue(jobs)
  jobs.each do |job|
    logger.info("Queueing: #{job}")
    queue = job['queue'] || worker_queue_key
    success = redis.lpush(queue, job.to_json)
    unless  success > 0
      raise "Failed to queue #{job}. Redis returned #{success}."
    end
  end
end

#queue_delayed(started_at) ⇒ Object



133
134
135
136
137
138
139
140
141
142
# File 'lib/when-do/do.rb', line 133

def queue_delayed(started_at)
  logger.info("Checking for delayed jobs.")
  raw_delayed_jobs = redis.multi do
    redis.zrevrangebyscore(delayed_queue_key, started_at.to_i, '-inf')
    redis.zremrangebyscore(delayed_queue_key, '-inf', started_at.to_i)
  end[0]
  delayed_jobs = raw_delayed_jobs.map { |j| JSON.parse(j) }
  logger.debug { "Found #{delayed_jobs.count} delayed jobs." }
  enqueue(delayed_jobs) if delayed_jobs.any?
end

#queue_scheduled(started_at) ⇒ Object



115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# File 'lib/when-do/do.rb', line 115

def queue_scheduled(started_at)
  schedules = redis.hvals(schedule_key)
  logger.info("Analyzing #{schedules.count} schedules.")
  scheduled_jobs = schedules.inject([]) do |jobs, s|
    schedule = JSON.parse(s)
    if (cron = When::Cron.valid(schedule['cron'])) && cron == started_at
      job = schedule.merge('jid' => SecureRandom.uuid)
      job.delete('cron')
      jobs << job
    else
      logger.error { "Could not interpret cron for #{schedule.inspect}" }
    end
    jobs
  end
  logger.debug { "Found #{scheduled_jobs.count} schedules due to be queued." }
  enqueue(scheduled_jobs) if scheduled_jobs.any?
end

#running?(started_at) ⇒ Boolean

Returns:

  • (Boolean)


81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/when-do/do.rb', line 81

def running?(started_at)
  day_key = build_day_key(started_at)
  min_key = build_min_key(started_at)

  logger.debug { "Checking Redis using day_key: '#{day_key}' and min_key: '#{min_key}'"}
  check_and_set_analyzed = redis.multi do
    redis.hget(day_key, min_key)
    redis.hset(day_key, min_key, 't')
    redis.expire(day_key, 60 * 60 * 24)
  end

  check_and_set_analyzed[0]
end

#start_loopObject



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/when-do/do.rb', line 29

def start_loop
  logger.info("Starting...")
  logger.info { "Schedule key: '#{schedule_key}', worker queue key: '#{worker_queue_key}', delayed queue key: '#{delayed_queue_key}'" }
  logger.info { "PID file: #{pid_file_path}" } if pid_file_path

  Signal.trap('USR1') { @logger = init_logger(opts[:log_path], opts[:log_level]) }

  loop do
    sleep_until_next_minute
    logger.debug { "Using #{`ps -o rss -p #{Process.pid}`.chomp.split("\n").last.to_i} kb of memory." }
    analyze_in_child_process
  end

rescue SystemExit => e
  raise e

rescue SignalException => e
  logger.info(e.inspect)
  File.delete(pid_file_path) if pid_file_path
  raise e

rescue Exception => e
  ([e.inspect] + e.backtrace).each { |line| logger.fatal(line) }
  raise e
end