Module: ActiveJob::QueueAdapters::ResqueExt

Includes:
MissionControl::Jobs::Adapter
Defined in:
lib/active_job/queue_adapters/resque_ext.rb

Defined Under Namespace

Classes: ResqueJobs

Instance Method Summary collapse

Methods included from MissionControl::Jobs::Adapter

#dispatch_job, #exposes_workers?, #find_recurring_task, #find_worker, #recurring_tasks, #supported_job_statuses, #supports_job_filter?, #supports_job_status?, #supports_recurring_tasks?, #workers

Instance Method Details

#activating(&block) ⇒ Object



9
10
11
# File 'lib/active_job/queue_adapters/resque_ext.rb', line 9

def activating(&block)
  Resque.with_per_thread_redis_override(redis, &block)
end

#clear_queue(queue_name) ⇒ Object



34
35
36
# File 'lib/active_job/queue_adapters/resque_ext.rb', line 34

def clear_queue(queue_name)
  Resque.remove_queue(queue_name)
end

#discard_all_jobs(jobs_relation) ⇒ Object



72
73
74
# File 'lib/active_job/queue_adapters/resque_ext.rb', line 72

def discard_all_jobs(jobs_relation)
  resque_jobs_for(jobs_relation).discard_all
end

#discard_job(job, jobs_relation) ⇒ Object



76
77
78
# File 'lib/active_job/queue_adapters/resque_ext.rb', line 76

def discard_job(job, jobs_relation)
  resque_jobs_for(jobs_relation).discard(job)
end

#fetch_jobs(jobs_relation) ⇒ Object



60
61
62
# File 'lib/active_job/queue_adapters/resque_ext.rb', line 60

def fetch_jobs(jobs_relation)
  resque_jobs_for(jobs_relation).all
end

#find_job(job_id, jobs_relation) ⇒ Object



80
81
82
# File 'lib/active_job/queue_adapters/resque_ext.rb', line 80

def find_job(job_id, jobs_relation)
  resque_jobs_for(jobs_relation).find_job(job_id)
end

#initialize(redis = Resque.redis) ⇒ Object



4
5
6
7
# File 'lib/active_job/queue_adapters/resque_ext.rb', line 4

def initialize(redis = Resque.redis)
  super()
  @redis = redis
end

#jobs_count(jobs_relation) ⇒ Object



56
57
58
# File 'lib/active_job/queue_adapters/resque_ext.rb', line 56

def jobs_count(jobs_relation)
  resque_jobs_for(jobs_relation).count
end

#pause_queue(queue_name) ⇒ Object



38
39
40
# File 'lib/active_job/queue_adapters/resque_ext.rb', line 38

def pause_queue(queue_name)
  ResquePauseHelper.pause(queue_name)
end

#queue_paused?(queue_name) ⇒ Boolean

Returns:

  • (Boolean)


46
47
48
# File 'lib/active_job/queue_adapters/resque_ext.rb', line 46

def queue_paused?(queue_name)
  ResquePauseHelper.paused?(queue_name)
end

#queue_size(queue_name) ⇒ Object



30
31
32
# File 'lib/active_job/queue_adapters/resque_ext.rb', line 30

def queue_size(queue_name)
  Resque.size queue_name
end

#queuesObject



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/active_job/queue_adapters/resque_ext.rb', line 13

def queues
  queues = queue_names
  active_statuses = []
  counts = []

  redis.multi do |multi|
    queues.each do |queue_name|
      active_statuses << multi.mget("pause:queue:#{queue_name}", "pause:all")
      counts << multi.llen("queue:#{queue_name}")
    end
  end

  queues.collect.with_index do |queue_name, index|
    { name: queue_name, active: active_statuses[index].value.compact.empty?, size: counts[index].value }
  end
end

#resume_queue(queue_name) ⇒ Object



42
43
44
# File 'lib/active_job/queue_adapters/resque_ext.rb', line 42

def resume_queue(queue_name)
  ResquePauseHelper.unpause(queue_name)
end

#retry_all_jobs(jobs_relation) ⇒ Object



64
65
66
# File 'lib/active_job/queue_adapters/resque_ext.rb', line 64

def retry_all_jobs(jobs_relation)
  resque_jobs_for(jobs_relation).retry_all
end

#retry_job(job, jobs_relation) ⇒ Object



68
69
70
# File 'lib/active_job/queue_adapters/resque_ext.rb', line 68

def retry_job(job, jobs_relation)
  resque_jobs_for(jobs_relation).retry_job(job)
end

#supported_job_filters(jobs_relation) ⇒ Object



50
51
52
53
54
# File 'lib/active_job/queue_adapters/resque_ext.rb', line 50

def supported_job_filters(jobs_relation)
  if jobs_relation.pending? then [ :queue_name ]
  else []
  end
end

#supports_queue_pausing?Boolean

Returns:

  • (Boolean)


84
85
86
# File 'lib/active_job/queue_adapters/resque_ext.rb', line 84

def supports_queue_pausing?
  defined?(ResquePauseHelper)
end