Module: Sidekiq::ResqueStatus

Included in:
Middleware::Client::Stats::ResqueLike, Middleware::Server::Stats::ResqueLike, SidekiqWorkerWithStatus
Defined in:
lib/sidekiq-resque_status.rb,
lib/sidekiq-resque_status/version.rb

Constant Summary collapse

VERSION =
"0.0.3"

Instance Method Summary collapse

Instance Method Details

#enqueue_job(worker, msg, queue) ⇒ Object

> This method will be called by the client middleware before enqueing a job.

> It stores information on the current job.

> These information will be display by resque web-ui each time a user is browsing:

- the statuses page (provided by resque-status)
- the job_stats page (provided by resque-job-stats)


19
20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/sidekiq-resque_status.rb', line 19

def enqueue_job(worker, msg, queue)
  now = Time.now.utc + 1

  # Resque Job Stats equivalent
  increment_stat("stats:jobs:#{worker.name}:enqueued", now)        

  # Status set to queued
  status_hash = { :time => now.to_i, :class => worker.name, :retry => false, :name => "#{worker.name}(#{msg['args']})", :status => "queued", :uuid => msg['jid'], :args => msg['args']}
  update_status("status:#{msg['jid']}", "queued", status_hash)

  # Add the job id to the _statuses key
  redis.zadd("_statuses", now.to_i, msg['jid'])
end

#job_completed(worker, msg, queue, duration = 0) ⇒ Object

> This method will be called by the server middleware after processing a job.

> It adds a description of the processed jobs.

> It updates information on the current job.

> These information will be display by resque web-ui each time a user is browsing:

- the statuses page (provided by resque-status)
- the job_stats page (provided by resque-job-stats)
- the queues page (provided by resque-web)


59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/sidekiq-resque_status.rb', line 59

def job_completed(worker, msg, queue, duration = 0)
  status_hash = complete_options(worker.to_process || 1, worker.processed || 1, duration, worker.description)
  status_hash = set_missing_values(worker, msg, queue, status_hash)

  # Status set to completed
  hash = update_status("status:#{msg['jid']}", "completed", status_hash) || {}
  time = hash["time"] || hash["run_at"]
  now = time ? Time.at(time.to_i) : Time.now.utc

  # Resque job Stats equivalent
  increment_stat("stats:jobs:#{msg['jid']}:timeseries:performed", now)
  increment_stat("stats:jobs:#{worker.class.name}:performed", now)

  # Set duration
  redis.rpush("stats:jobs:#{worker.class.name}:duration", duration) 
  redis.rpush("stats:jobs:#{msg['jid']}:duration", duration) 

  # remove job from the queue tab
  redis.lpop("queue:#{queue}")
end

#job_failed(worker, msg, queue, error) ⇒ Object

> This method will be called by the server middleware each time a job failed.

> It adds a complete description of the failure.

> It updates information on the current job.

> It makes sure the job can be replay.

> These information will be display by resque web-ui each time a user is browsing:

- the failed page (provided by resque-web)


88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/sidekiq-resque_status.rb', line 88

def job_failed(worker, msg, queue, error)
  hash = merge_value("status:#{msg['jid']}", {"status" => "failed", "message" => error.message})
  update_status("status:#{msg['jid']}", "failed", hash)

  # pass the jid into args hash to replay the job
  args = msg['args'].is_a?(Array) && msg['args'].first.is_a?(Hash) ? [msg['args'].first.merge({'jid' => msg['jid']})] : msg['args']

  failed_message = {
                      :failed_at => Time.now.rfc2822,
                      :payload => {"class" => worker.class.name, "args" => args},
                      :class => worker.class.name,
                      :exception => error.class.name,
                      :error => error.message,
                      :backtrace => error.backtrace, 
                      :worker => queue,
                      :queue => queue, 
                      :args => args,
                      :jid => msg['jid']
                    }
  # Push the failed information into redis
  redis.rpush('failed', MultiJson.dump(failed_message))

  # Increment failed statistics for job Stats 
  increment_stat("stats:jobs:#{worker.class.name}:failed", Time.now)  
  increment_expire_key("stat:failed")
end

#job_in_progress(worker, msg, queue) ⇒ Object

> This method will be called by the server middleware before processing a job.

> It makes sure we are not loosing any information at the beggining of the process.

> It updates information on the current job.

> These information will be display by resque web-ui each time a user is browsing:

- the statuses page (provided by resque-status)
- the job_stats page (provided by resque-job-stats)


41
42
43
44
45
46
47
48
# File 'lib/sidekiq-resque_status.rb', line 41

def job_in_progress(worker, msg, queue)
  # When resqueue web re-enqueue a job we need to make sure worker.jid and msg[jid] are defined
  worker.jid ||= msg['jid'] ||= msg['args'].first['jid'] if msg['args'] && msg['args'].is_a?(Array)

  # Set status to working
  status_hash = set_missing_values(worker, msg, queue)
  status_hash = update_status("status:#{msg['jid']}", "working", status_hash)
end