Class: Edamame::Queue::BeanstalkQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/edamame/queue/beanstalk.rb

Overview

Persistent job queue for periodic requests.

Jobs are reserved, run, and if successful put back with an updated delay parameter.

This is useful for mass scraping of timelines (RSS feeds, twitter search results, etc. See github.com/mrflip/wuclan for )

Constant Summary collapse

DEFAULT_OPTIONS =
{
  :priority          => 65536,    # default job queue priority
  :time_to_run       => 60*5,     # 5 minutes to complete a job or assume dead
  :uris              => ['localhost:11300'],
  :default_tube      => 'default',
}

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(_options = {}) ⇒ BeanstalkQueue

beanstalk_pool – specify nil to use the default single-node [‘localhost:11300’] pool



23
24
25
26
# File 'lib/edamame/queue/beanstalk.rb', line 23

def initialize _options={}
  self.options = DEFAULT_OPTIONS.deep_merge(_options.compact)
  options[:default_tube] = options[:default_tube].to_s
end

Instance Attribute Details

#optionsObject

Returns the value of attribute options.



18
19
20
# File 'lib/edamame/queue/beanstalk.rb', line 18

def options
  @options
end

Instance Method Details

#beanstalkObject

The beanstalk pool which acts as job queue



73
74
75
76
77
78
# File 'lib/edamame/queue/beanstalk.rb', line 73

def beanstalk
  return @beanstalk if @beanstalk
  @beanstalk = Beanstalk::Pool.new(options[:uris], options[:default_tube])
  self.tube= options[:default_tube]
  @beanstalk
end

#buryObject

Shelves the job.



68
69
70
# File 'lib/edamame/queue/beanstalk.rb', line 68

def bury
  job.bury job.priority
end

#closeObject

Close the job queue



80
81
82
83
# File 'lib/edamame/queue/beanstalk.rb', line 80

def close
  @beanstalk.close if @beanstalk
  @beanstalk = nil
end

#delete(job) ⇒ Object

Remove the job from the queue.



39
40
41
# File 'lib/edamame/queue/beanstalk.rb', line 39

def delete(job)
  job.delete
end

#empty(tube = nil, &block) ⇒ Object



104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/edamame/queue/beanstalk.rb', line 104

def empty tube=nil, &block
  tube = tube.to_s if tube
  curr_tube    = beanstalk.list_tube_used.values.first
  curr_watches = beanstalk.list_tubes_watched.values.first
  beanstalk.use   tube if tube
  beanstalk.watch tube if tube
  p ["emptying", tube, beanstalk_total_jobs]
  loop do
    kicked = beanstalk.open_connections.map{|conxn| conxn.kick(20) }
    break if (beanstalk_total_jobs == 0) || (!beanstalk.peek_ready)
    qjob = reserve(5) or break
    yield qjob
    qjob.delete
  end
  beanstalk.use curr_tube
  beanstalk.ignore tube if (! curr_watches.include?(tube))
end

#empty_all(&block) ⇒ Object



122
123
124
125
126
127
# File 'lib/edamame/queue/beanstalk.rb', line 122

def empty_all &block
  tubes = beanstalk.list_tubes.values.flatten.uniq
  tubes.each do |tube|
    empty tube,  &block
  end
end

#put(job, priority = nil, delay = nil) ⇒ Object

Add a new Job to the queue



31
32
33
34
# File 'lib/edamame/queue/beanstalk.rb', line 31

def put job, priority=nil, delay=nil
  beanstalk.yput(job.to_hash(false),
    (priority || job.priority), (delay || job.delay), job.ttr)
end

#release(job, priority = nil, delay = nil) ⇒ Object

Returns the job to the queue, to be re-run later.

release’ing a job acknowledges it was completed, successfully or not



48
49
50
# File 'lib/edamame/queue/beanstalk.rb', line 48

def release job, priority=nil, delay=nil
  job.release( (priority || job.priority), (delay || job.delay) )
end

#reserve(timeout = 10) ⇒ Object

Take the next (highest priority, delay met) job. Set timeout (default is 10s) Returns nil on error or timeout. Interrupt error passes through



57
58
59
60
61
62
63
# File 'lib/edamame/queue/beanstalk.rb', line 57

def reserve timeout=10
  begin
    job = beanstalk.reserve(timeout) or return
  rescue Beanstalk::TimedOut => e ; warn e.to_s ; sleep 0.4 ; return ;
  rescue StandardError => e       ; warn e.to_s ; sleep 1   ; return ; end
  job
end

#statsObject

Stats on job count across the pool



93
94
95
# File 'lib/edamame/queue/beanstalk.rb', line 93

def stats
  beanstalk.stats.select{|k,v| k =~ /jobs/}
end

#total_jobsObject

Total jobs in the queue, whether reserved, ready, buried or delayed.



97
98
99
# File 'lib/edamame/queue/beanstalk.rb', line 97

def total_jobs
  [:reserved, :ready, :buried, :delayed].inject(0){|sum,type| sum += stats["current-jobs-#{type}"]}
end

#tube=(_tube) ⇒ Object

uses and watches the given beanstalk tube



86
87
88
89
90
# File 'lib/edamame/queue/beanstalk.rb', line 86

def tube= _tube
  puts "#{self.class} setting tube to #{_tube}, was #{@tube}"
  @beanstalk.use   _tube
  @beanstalk.watch _tube
end