Class: Cyclop::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/cyclop/worker.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config = {}) ⇒ Worker

Returns a new instance of Worker.

Raises:

  • (ArgumentError)


18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/cyclop/worker.rb', line 18

def initialize(config={})
  raise ArgumentError, 'mongo["database"] is required' unless config["mongo"] && config["mongo"]["database"]

  self.queues = config["queues"] || []
  self.logger = Logger.new(config["log_file"] || $stdout)
  self.sleep_interval = config["sleep_interval"] || 1
  self.actions = config["actions"] || "./actions"
  self.processed_jobs = 0
  self.die_after = config["die_after"]
  @job_opts = {}
  if config["limit_to_host"]
    @job_opts[:host] = config["limit_to_host"]
    @job_opts[:host] = Cyclop.host if @job_opts[:host]=="localhost"
  end
  connection = if config["mongo"]["hosts"]
    Mongo::ReplSetConnection.new(
      *config["mongo"]["hosts"],
      rs_name: config["mongo"]["rs_name"],
      read_secondary: !!config["mongo"]["read_secondary"],
      logger: (logger if config["mongo"]["log"]),
    )
  else
    Mongo::Connection.new(
      (config["mongo"]["host"] || "127.0.0.1"),
      (config["mongo"]["port"] || 27017),
      logger: (logger if config["mongo"]["log"]),
    )
  end
  Cyclop.db = connection.db config["mongo"]["database"]
end

Instance Attribute Details

#actionsObject

Path to actions directory



10
11
12
# File 'lib/cyclop/worker.rb', line 10

def actions
  @actions
end

#die_afterObject

Number of jobs to process before exiting



12
13
14
# File 'lib/cyclop/worker.rb', line 12

def die_after
  @die_after
end

#job_optsObject

Options passed to Cyclop.next to get next job



16
17
18
# File 'lib/cyclop/worker.rb', line 16

def job_opts
  @job_opts
end

#loggerObject

Logger for master



6
7
8
# File 'lib/cyclop/worker.rb', line 6

def logger
  @logger
end

#processed_jobsObject

Number of jobs processed by this worker



14
15
16
# File 'lib/cyclop/worker.rb', line 14

def processed_jobs
  @processed_jobs
end

#queuesObject

Queues to process



4
5
6
# File 'lib/cyclop/worker.rb', line 4

def queues
  @queues
end

#sleep_intervalObject

How much time to sleep between poll



8
9
10
# File 'lib/cyclop/worker.rb', line 8

def sleep_interval
  @sleep_interval
end

Instance Method Details

#perform(job) ⇒ Object

Called inside forked process

Parameters:

  • (Cyclop::Job) job - the job to process



89
90
91
92
93
94
95
96
97
# File 'lib/cyclop/worker.rb', line 89

def perform(job)
  load_actions
  Cyclop::Action.find_by_queue(job.queue).perform(*job.job_params)
  0
rescue Exception => e
  log e.to_s
  job.release! e
  1
end

#runObject

Start processing jobs



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/cyclop/worker.rb', line 50

def run
  register_signal_handlers
  loop do
    if stop?
      log "Shutting down..."
      break
    end
    if job = next_job
      @sleeping = false
      if @pid = fork
        msg = "Forked process #{@pid} to work on job #{job.queue}-#{job._id}..."
        log msg
        procline msg
        Process.wait
        log "Child process #{@pid} ended with status: #{$?}"
        self.processed_jobs += 1
        if $?.exitstatus==0
          job.complete!
        else
          job.release!
        end
      else
        procline "Processing #{job.queue}-#{job._id} (started at #{Time.now.utc})"
        exit! perform job
      end
    else
      log "No more job to process, start sleeping..." unless @sleeping
      @sleeping = true
      sleep sleep_interval
    end
  end
end

#stopObject

Gracefull shutdown



100
101
102
# File 'lib/cyclop/worker.rb', line 100

def stop
  @stop = true
end

#stop!Object

Forced shutdown



105
106
107
108
109
110
111
# File 'lib/cyclop/worker.rb', line 105

def stop!
  if @pid
    Process.kill "TERM", @pid
    Process.wait
  end
  exit!
end