Class: Edamame::Queue::BeanstalkQueue
- Inherits:
-
Object
- Object
- Edamame::Queue::BeanstalkQueue
- Defined in:
- lib/edamame/queue/beanstalk.rb
Overview
Persistent job queue for periodic requests.
Jobs are reserved, run, and if successful put back with an updated delay parameter.
This is useful for mass scraping of timelines (RSS feeds, twitter search results, etc. See github.com/mrflip/wuclan for )
Constant Summary collapse
- DEFAULT_OPTIONS =
{ :priority => 65536, # default job queue priority :time_to_run => 60*5, # 5 minutes to complete a job or assume dead :uris => ['localhost:11300'], :default_tube => 'default', }
Instance Attribute Summary collapse
-
#options ⇒ Object
Returns the value of attribute options.
Instance Method Summary collapse
-
#beanstalk ⇒ Object
The beanstalk pool which acts as job queue.
-
#bury ⇒ Object
Shelves the job.
-
#close ⇒ Object
Close the job queue.
-
#delete(job) ⇒ Object
Remove the job from the queue.
- #empty(tube = nil, &block) ⇒ Object
- #empty_all(&block) ⇒ Object
-
#initialize(_options = {}) ⇒ BeanstalkQueue
constructor
beanstalk_pool – specify nil to use the default single-node [‘localhost:11300’] pool.
-
#put(job, priority = nil, delay = nil) ⇒ Object
Add a new Job to the queue.
-
#release(job, priority = nil, delay = nil) ⇒ Object
Returns the job to the queue, to be re-run later.
-
#reserve(timeout = 10) ⇒ Object
Take the next (highest priority, delay met) job.
-
#stats ⇒ Object
Stats on job count across the pool.
-
#total_jobs ⇒ Object
Total jobs in the queue, whether reserved, ready, buried or delayed.
-
#tube=(_tube) ⇒ Object
uses and watches the given beanstalk tube.
Constructor Details
#initialize(_options = {}) ⇒ BeanstalkQueue
beanstalk_pool – specify nil to use the default single-node [‘localhost:11300’] pool
23 24 25 26 |
# File 'lib/edamame/queue/beanstalk.rb', line 23 def initialize ={} self. = DEFAULT_OPTIONS.deep_merge(.compact) [:default_tube] = [:default_tube].to_s end |
Instance Attribute Details
#options ⇒ Object
Returns the value of attribute options.
18 19 20 |
# File 'lib/edamame/queue/beanstalk.rb', line 18 def @options end |
Instance Method Details
#beanstalk ⇒ Object
The beanstalk pool which acts as job queue
73 74 75 76 77 78 |
# File 'lib/edamame/queue/beanstalk.rb', line 73 def beanstalk return @beanstalk if @beanstalk @beanstalk = Beanstalk::Pool.new([:uris], [:default_tube]) self.tube= [:default_tube] @beanstalk end |
#bury ⇒ Object
Shelves the job.
68 69 70 |
# File 'lib/edamame/queue/beanstalk.rb', line 68 def bury job.bury job.priority end |
#close ⇒ Object
Close the job queue
80 81 82 83 |
# File 'lib/edamame/queue/beanstalk.rb', line 80 def close @beanstalk.close if @beanstalk @beanstalk = nil end |
#delete(job) ⇒ Object
Remove the job from the queue.
39 40 41 |
# File 'lib/edamame/queue/beanstalk.rb', line 39 def delete(job) job.delete end |
#empty(tube = nil, &block) ⇒ Object
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 |
# File 'lib/edamame/queue/beanstalk.rb', line 104 def empty tube=nil, &block tube = tube.to_s if tube curr_tube = beanstalk.list_tube_used.values.first curr_watches = beanstalk.list_tubes_watched.values.first beanstalk.use tube if tube beanstalk.watch tube if tube p ["emptying", tube, beanstalk_total_jobs] loop do kicked = beanstalk.open_connections.map{|conxn| conxn.kick(20) } break if (beanstalk_total_jobs == 0) || (!beanstalk.peek_ready) qjob = reserve(5) or break yield qjob qjob.delete end beanstalk.use curr_tube beanstalk.ignore tube if (! curr_watches.include?(tube)) end |
#empty_all(&block) ⇒ Object
122 123 124 125 126 127 |
# File 'lib/edamame/queue/beanstalk.rb', line 122 def empty_all &block tubes = beanstalk.list_tubes.values.flatten.uniq tubes.each do |tube| empty tube, &block end end |
#put(job, priority = nil, delay = nil) ⇒ Object
Add a new Job to the queue
31 32 33 34 |
# File 'lib/edamame/queue/beanstalk.rb', line 31 def put job, priority=nil, delay=nil beanstalk.yput(job.to_hash(false), (priority || job.priority), (delay || job.delay), job.ttr) end |
#release(job, priority = nil, delay = nil) ⇒ Object
Returns the job to the queue, to be re-run later.
release’ing a job acknowledges it was completed, successfully or not
48 49 50 |
# File 'lib/edamame/queue/beanstalk.rb', line 48 def release job, priority=nil, delay=nil job.release( (priority || job.priority), (delay || job.delay) ) end |
#reserve(timeout = 10) ⇒ Object
Take the next (highest priority, delay met) job. Set timeout (default is 10s) Returns nil on error or timeout. Interrupt error passes through
57 58 59 60 61 62 63 |
# File 'lib/edamame/queue/beanstalk.rb', line 57 def reserve timeout=10 begin job = beanstalk.reserve(timeout) or return rescue Beanstalk::TimedOut => e ; warn e.to_s ; sleep 0.4 ; return ; rescue StandardError => e ; warn e.to_s ; sleep 1 ; return ; end job end |
#stats ⇒ Object
Stats on job count across the pool
93 94 95 |
# File 'lib/edamame/queue/beanstalk.rb', line 93 def stats beanstalk.stats.select{|k,v| k =~ /jobs/} end |
#total_jobs ⇒ Object
Total jobs in the queue, whether reserved, ready, buried or delayed.
97 98 99 |
# File 'lib/edamame/queue/beanstalk.rb', line 97 def total_jobs [:reserved, :ready, :buried, :delayed].inject(0){|sum,type| sum += stats["current-jobs-#{type}"]} end |
#tube=(_tube) ⇒ Object
uses and watches the given beanstalk tube
86 87 88 89 90 |
# File 'lib/edamame/queue/beanstalk.rb', line 86 def tube= _tube puts "#{self.class} setting tube to #{_tube}, was #{@tube}" @beanstalk.use _tube @beanstalk.watch _tube end |