Module: Resque::Plugins::RateLimited

Included in:
BaseApiQueue
Defined in:
lib/resque/plugins/rate_limited/rate_limited.rb,
lib/resque/plugins/rate_limited/apis/twitter_queue.rb,
lib/resque/plugins/rate_limited/apis/base_api_queue.rb,
lib/resque/plugins/rate_limited/apis/evernote_queue.rb,
lib/resque/plugins/rate_limited/apis/angellist_queue.rb,
lib/resque/plugins/rate_limited/rate_limited_un_pause.rb

Defined Under Namespace

Classes: AngellistQueue, BaseApiQueue, EvernoteQueue, TwitterQueue, UnPause

Constant Summary collapse

RESQUE_PREFIX =
'queue:'.freeze
MUTEX =
'Resque::Plugins::RateLimited'.freeze

Instance Method Summary collapse

Instance Method Details

#around_perform_with_check_and_requeue(*params) ⇒ Object



7
8
9
10
11
12
13
14
15
# File 'lib/resque/plugins/rate_limited/rate_limited.rb', line 7

def around_perform_with_check_and_requeue(*params)
  paused = false
  with_lock do
    paused = paused?
    Resque.enqueue_to(paused_queue_name, self, *params) if paused
  end
  return if paused
  yield
end

#find_class(klass) ⇒ Object



87
88
89
90
91
92
93
# File 'lib/resque/plugins/rate_limited/rate_limited.rb', line 87

def find_class(klass)
  return klass if klass.is_a? Class
  return Object.const_get(klass) unless klass.include?('::')
  klass.split('::').reduce(Object) do |mod, class_name|
    mod.const_get(class_name)
  end
end

#pauseObject



50
51
52
53
54
55
56
# File 'lib/resque/plugins/rate_limited/rate_limited.rb', line 50

def pause
  Resque.redis.renamenx(prefixed(@queue), prefixed(paused_queue_name))
  true
rescue Redis::CommandError => e
  raise unless e.message == 'ERR no such key'
  false
end

#pause_until(timestamp) ⇒ Object



38
39
40
# File 'lib/resque/plugins/rate_limited/rate_limited.rb', line 38

def pause_until(timestamp)
  UnPause.enqueue(timestamp, name) if pause
end

#paused?(unknown = false) ⇒ Boolean

Returns:

  • (Boolean)


58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/resque/plugins/rate_limited/rate_limited.rb', line 58

def paused?(unknown = false)
  # parameter is what to return if the queue is empty, and so the state is unknown
  if Resque.inline
    false
  elsif Resque.redis.exists(prefixed(@queue))
    false
  elsif Resque.redis.exists(prefixed(paused_queue_name))
    true
  else
    unknown
  end
end

#paused_queue_nameObject



71
72
73
# File 'lib/resque/plugins/rate_limited/rate_limited.rb', line 71

def paused_queue_name
  "#{@queue}_paused".to_sym
end

#prefixed(name) ⇒ Object



75
76
77
# File 'lib/resque/plugins/rate_limited/rate_limited.rb', line 75

def prefixed(name)
  "#{RESQUE_PREFIX}#{name}"
end

#rate_limited_enqueue(klass, *params) ⇒ Object



17
18
19
20
21
22
23
24
25
# File 'lib/resque/plugins/rate_limited/rate_limited.rb', line 17

def rate_limited_enqueue(klass, *params)
  with_lock do
    if paused?
      Resque.enqueue_to(paused_queue_name, klass, *params)
    else
      Resque.enqueue_to(@queue, klass, *params)
    end
  end
end

#rate_limited_requeue(klass, *params) ⇒ Object



27
28
29
30
31
32
33
34
35
36
# File 'lib/resque/plugins/rate_limited/rate_limited.rb', line 27

def rate_limited_requeue(klass, *params)
  # if the queue is empty, this was the last job - so queue to the paused queue
  with_lock do
    if paused?(true)
      Resque.enqueue_to(paused_queue_name, klass, *params)
    else
      Resque.enqueue_to(@queue, klass, *params)
    end
  end
end

#un_pauseObject



42
43
44
45
46
47
48
# File 'lib/resque/plugins/rate_limited/rate_limited.rb', line 42

def un_pause
  Resque.redis.renamenx(prefixed(paused_queue_name), prefixed(@queue))
  true
rescue Redis::CommandError => e
  raise unless e.message == 'ERR no such key'
  false
end

#with_lockObject



79
80
81
82
83
84
85
# File 'lib/resque/plugins/rate_limited/rate_limited.rb', line 79

def with_lock
  if Resque.inline
    yield
  else
    RedisMutex.with_lock(MUTEX, block: 60, expire: 120) { yield }
  end
end