Class: RabbitJobs::Worker

Inherits:
Object
  • Object
show all
Includes:
AmqpHelpers, Logger
Defined in:
lib/rabbit_jobs/worker.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Logger

#log, #log!

Methods included from AmqpHelpers

#amqp_with_exchange, #amqp_with_queue, #make_queue

Constructor Details

#initialize(*queues) ⇒ Worker

Workers should be initialized with an array of string queue names. The order is important: a Worker will check the first queue given for a job. If none is found, it will check the second queue name given. If a job is found, it will be processed. Upon completion, the Worker will again check the first queue given, and so forth. In this way the queue list passed to a Worker on startup defines the priorities of queues.

If passed a single “*”, this Worker will operate on all queues in alphabetical order. Queues can be dynamically added or removed without needing to restart workers using this method.



21
22
23
24
25
26
# File 'lib/rabbit_jobs/worker.rb', line 21

def initialize(*queues)
  @queues = queues.map { |queue| queue.to_s.strip }.flatten.uniq
  if @queues == ['*'] || @queues.empty?
    @queues = RabbitJobs.config.routing_keys
  end
end

Instance Attribute Details

#backgroundObject

Returns the value of attribute background.



8
9
10
# File 'lib/rabbit_jobs/worker.rb', line 8

def background
  @background
end

#pidfileObject

Returns the value of attribute pidfile.



8
9
10
# File 'lib/rabbit_jobs/worker.rb', line 8

def pidfile
  @pidfile
end

Instance Method Details

#kill_childObject



107
108
109
110
111
112
113
114
115
116
117
# File 'lib/rabbit_jobs/worker.rb', line 107

def kill_child
  if @job && @job.child_pid
    # log! "Killing child at #{@child}"
    if Kernel.system("ps -o pid,state -p #{@job.child_pid}")
      Process.kill("KILL", @job.child_pid) rescue nil
    else
      # log! "Child #{@child} not found, restarting."
      # shutdown
    end
  end
end

#queuesObject



28
29
30
# File 'lib/rabbit_jobs/worker.rb', line 28

def queues
  @queues || ['default']
end

#shutdownObject



79
80
81
# File 'lib/rabbit_jobs/worker.rb', line 79

def shutdown
  @shutdown = true
end

#shutdown!Object



102
103
104
105
# File 'lib/rabbit_jobs/worker.rb', line 102

def shutdown!
  shutdown
  kill_child
end

#startupObject



83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/rabbit_jobs/worker.rb', line 83

def startup
  # prune_dead_workers

  Process.daemon(true) if self.background

  if self.pidfile
    File.open(self.pidfile, 'w') { |f| f << Process.pid }
  end

  # Fix buffering so we can `rake rj:work > resque.log` and
  # get output from the child in there.
  $stdout.sync = true

  @shutdown = false

  Signal.trap('TERM') { shutdown }
  Signal.trap('INT')  { shutdown! }
end

#work(time = 0) ⇒ Object

Subscribes to channel and working on jobs



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/rabbit_jobs/worker.rb', line 33

def work(time = 0)
  startup

  processed_count = 0
  amqp_with_exchange do |connection, exchange|
    exchange.channel.prefetch(1)

    check_shutdown = Proc.new {
      if @shutdown
        log "Processed jobs: #{processed_count}"
        log "Stopping worker..."

        connection.close {
          File.delete(self.pidfile) if self.pidfile
          EM.stop { exit! }
        }
      end
    }

    queues.each do |routing_key|
      queue = make_queue(exchange, routing_key)

      log "Worker ##{Process.pid} <= #{exchange.name}##{routing_key}"

      queue.subscribe(ack: true) do |, payload|
        @job = RabbitJobs::Job.parse(payload)
        @job.run_perform unless @job.expired?
        .ack
        processed_count += 1
        check_shutdown.call
      end
    end

    if time > 0
      # for debugging
      EM.add_timer(time) do
        self.shutdown
      end
    end

    EM.add_periodic_timer(1) do
      check_shutdown.call
    end
  end
end