Class: Kongnomal::BeanstalkServer

Inherits:
Object
  • Object
show all
Defined in:
lib/kongnomal/beanstalk_server.rb

Instance Method Summary collapse

Constructor Details

#initialize(&blk) ⇒ BeanstalkServer

Returns a new instance of BeanstalkServer.



3
4
5
6
7
# File 'lib/kongnomal/beanstalk_server.rb', line 3

def initialize(&blk)
  @on_startup = block_given? ? blk : Proc.new { puts "beanstalk server is up and running..." }
  at_exit {stop_beanstalkd}
  self
end

Instance Method Details

#connect_to(ip, port_number) ⇒ Object



14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/kongnomal/beanstalk_server.rb', line 14

def connect_to(ip, port_number)
  job_event_queue = job_event_queue("#{ip}:#{port_number}", Kongnomal::JobQueue::JOB_EVENT_TUBE)
  @on_startup.call

  loop do
    job = begin
      job_event_queue.reserve
    rescue Exception => e
      puts "Failed to retrieve job event from #{ip}:#{port_number}[#{Kongnomal::JobQueue::JOB_EVENT_TUBE}]"
      raise e
    end

    job_evented = JSON.parse(job.body)
    begin
      Kongnomal::Job.send(job_evented["state"]).call(job_evented)
    rescue Exception => e
      job.release(nil, 60)
      puts "Failed to process job event: #{job_evented.inspect}, retrying 60 sec later..."
      puts e.message
      puts e.backtrace
      next
    end
  
    begin
      job.delete
    rescue Exception => e
      puts "Failed to delete job event: #{job_evented.inspect} from #{ip}:#{port_number}[#{Kongnomal::JobQueue::JOB_EVENT_TUBE}], ignoring for now to be reprocessed after natural beanstalk job timeout"
      puts e.message
      puts e.backtrace
    end
  end
end

#start(ip, port_number) ⇒ Object



9
10
11
12
# File 'lib/kongnomal/beanstalk_server.rb', line 9

def start(ip, port_number)
  start_beanstalkd(ip, port_number)
  connect_to(ip, port_number)
end