Class: Gitlab::SidekiqDaemon::Monitor

Inherits:
Daemon
  • Object
show all
Extended by:
Utils::Override
Includes:
Utils::StrongMemoize
Defined in:
lib/gitlab/sidekiq_daemon/monitor.rb

Constant Summary collapse

NOTIFICATION_CHANNEL =
'sidekiq:cancel:notifications'
CANCEL_DEADLINE =
24.hours.seconds
RECONNECT_TIME =
3.seconds
CancelledError =

We use exception derived from ‘Exception` to consider this as an very low-level exception that should not be caught by application

Class.new(Exception)

Instance Attribute Summary

Attributes inherited from Daemon

#thread

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Utils::Override

extended, extensions, included, method_added, override, prepended, queue_verification, verify!

Methods inherited from Daemon

#enabled?, initialize_instance, instance, #start, #stop, #thread?

Constructor Details

#initializeMonitor

Returns a new instance of Monitor.



18
19
20
21
22
23
# File 'lib/gitlab/sidekiq_daemon/monitor.rb', line 18

def initialize
  super

  @jobs = {}
  @jobs_mutex = Mutex.new
end

Class Method Details

.cancel_job(jid) ⇒ Object



53
54
55
56
57
58
59
60
61
62
63
# File 'lib/gitlab/sidekiq_daemon/monitor.rb', line 53

def self.cancel_job(jid)
  payload = {
    action: 'cancel',
    jid: jid
  }.to_json

  ::Gitlab::Redis::SharedState.with do |redis|
    redis.setex(cancel_job_key(jid), CANCEL_DEADLINE, 1)
    redis.publish(NOTIFICATION_CHANNEL, payload)
  end
end

Instance Method Details

#jobsObject



65
66
67
68
69
# File 'lib/gitlab/sidekiq_daemon/monitor.rb', line 65

def jobs
  @jobs_mutex.synchronize do
    @jobs.dup
  end
end

#thread_nameObject



26
27
28
# File 'lib/gitlab/sidekiq_daemon/monitor.rb', line 26

def thread_name
  "job_monitor"
end

#within_job(worker_class, jid, queue) ⇒ Object



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/gitlab/sidekiq_daemon/monitor.rb', line 30

def within_job(worker_class, jid, queue)
  @jobs_mutex.synchronize do
    @jobs[jid] = { worker_class: worker_class, thread: Thread.current, started_at: Gitlab::Metrics::System.monotonic_time }
  end

  if cancelled?(jid)
    Sidekiq.logger.warn(
      class: self.class.to_s,
      action: 'run',
      queue: queue,
      jid: jid,
      canceled: true
    )
    raise CancelledError
  end

  yield
ensure
  @jobs_mutex.synchronize do
    @jobs.delete(jid)
  end
end