Class: Resque::Worker

Inherits:
Object
  • Object
show all
Includes:
Plugins::MultiJobForks::RssReader
Defined in:
lib/resque-multi-job-forks.rb

Constant Summary collapse

WorkerTerminated =
Class.new(StandardError)

Constants included from Plugins::MultiJobForks::RssReader

Plugins::MultiJobForks::RssReader::LINUX, Plugins::MultiJobForks::RssReader::PS_CMD, Plugins::MultiJobForks::RssReader::UNITS

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Plugins::MultiJobForks::RssReader

#rss, #rss_linux, #rss_posix

Instance Attribute Details

#jobs_per_forkObject

Returns the value of attribute jobs_per_fork.



9
10
11
# File 'lib/resque-multi-job-forks.rb', line 9

def jobs_per_fork
  @jobs_per_fork
end

#jobs_processedObject (readonly)

Returns the value of attribute jobs_processed.



11
12
13
# File 'lib/resque-multi-job-forks.rb', line 11

def jobs_processed
  @jobs_processed
end

#memory_thresholdObject

Returns the value of attribute memory_threshold.



10
11
12
# File 'lib/resque-multi-job-forks.rb', line 10

def memory_threshold
  @memory_threshold
end

#seconds_per_forkObject

Returns the value of attribute seconds_per_fork.



8
9
10
# File 'lib/resque-multi-job-forks.rb', line 8

def seconds_per_fork
  @seconds_per_fork
end

Class Method Details

.multi_jobs_per_fork?Boolean

Returns:

  • (Boolean)


15
16
17
# File 'lib/resque-multi-job-forks.rb', line 15

def self.multi_jobs_per_fork?
  ENV["DISABLE_MULTI_JOBS_PER_FORK"].nil?
end

Instance Method Details

#fork(&block) ⇒ Object



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/resque-multi-job-forks.rb', line 21

def fork(&block)
  if child = Kernel.fork
    return child
  else
    if term_child
      unregister_signal_handlers
      trap('TERM') do
        trap('TERM') do
          # Ignore subsequent term signals
        end

        if @performing_job
          # If a job is in progress, stop it immediately.
          raise TermException.new("SIGTERM")
        else
          # If we're not currently running a job, shut down cleanly.
          # This allows us to push unworked jobs back on the queue.
          shutdown
        end
      end
      trap('QUIT') { shutdown }
    end
    raise NotImplementedError, "Pretending to not have forked"
    # perform_with_fork will run the job and continue working...
  end
end

#fork_hijacked?Boolean

Returns:

  • (Boolean)


137
138
139
# File 'lib/resque-multi-job-forks.rb', line 137

def fork_hijacked?
  @release_fork_limit
end

#fork_job_limitObject



159
160
161
# File 'lib/resque-multi-job-forks.rb', line 159

def fork_job_limit
  jobs_per_fork.nil? ? Time.now.to_f + seconds_per_fork : jobs_per_fork
end

#fork_job_limit_reached?Boolean

Returns:

  • (Boolean)


163
164
165
# File 'lib/resque-multi-job-forks.rb', line 163

def fork_job_limit_reached?
  fork_job_limit_remaining <= 0 || fork_job_over_memory_threshold?
end

#fork_job_limit_remainingObject



167
168
169
# File 'lib/resque-multi-job-forks.rb', line 167

def fork_job_limit_remaining
  jobs_per_fork.nil? ? @release_fork_limit - Time.now.to_f : jobs_per_fork - @jobs_processed
end

#fork_job_over_memory_threshold?Boolean

Returns:

  • (Boolean)


183
184
185
# File 'lib/resque-multi-job-forks.rb', line 183

def fork_job_over_memory_threshold?
  !!(memory_threshold && rss > memory_threshold)
end

#hijack_forkObject



141
142
143
144
145
146
147
148
# File 'lib/resque-multi-job-forks.rb', line 141

def hijack_fork
  log_with_severity :debug, 'hijack fork.'
  @suppressed_fork_hooks = [Resque.after_fork, Resque.before_fork]
  Resque.after_fork = Resque.before_fork = nil
  @release_fork_limit = fork_job_limit
  @jobs_processed = 0
  @fork_per_job = false
end

#is_parent_process?Boolean

Returns:

  • (Boolean)


128
129
130
# File 'lib/resque-multi-job-forks.rb', line 128

def is_parent_process?
  @child || @pid == Process.pid
end

#minutes_per_forkObject



175
176
177
# File 'lib/resque-multi-job-forks.rb', line 175

def minutes_per_fork
  ENV['MINUTES_PER_FORK'].nil? ? 1 : ENV['MINUTES_PER_FORK'].to_i
end

#pause_processing_with_multi_job_forksObject Also known as: pause_processing



90
91
92
93
# File 'lib/resque-multi-job-forks.rb', line 90

def pause_processing_with_multi_job_forks
  shutdown_child
  pause_processing_without_multi_job_forks
end

#perform_with_multi_job_forks(job = nil) ⇒ Object Also known as: perform



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/resque-multi-job-forks.rb', line 56

def perform_with_multi_job_forks(job = nil)
  @fork_per_job = true unless fork_hijacked? # reconnect and after_fork
  if shutdown?
    # We got a request to shut down _after_ grabbing a job but _before_ starting work
    # on it. Immediately report the job as failed and return.
    if job
      report_failed_job(job, WorkerTerminated.new("shutdown before job start"))
    end
    return
  end
  @performing_job = true
  perform_without_multi_job_forks(job)
  hijack_fork unless fork_hijacked?
  @jobs_processed += 1
ensure
  @performing_job = false
end

#reconnect_with_multi_job_forksObject Also known as: reconnect

Reconnect only once



105
106
107
108
109
110
# File 'lib/resque-multi-job-forks.rb', line 105

def reconnect_with_multi_job_forks
  unless @reconnected
    reconnect_without_multi_job_forks
    @reconnected = true
  end
end

#release_and_exit!Object



132
133
134
135
# File 'lib/resque-multi-job-forks.rb', line 132

def release_and_exit!
  release_fork if fork_hijacked?
  run_at_exit_hooks ? exit : exit!(true)
end

#release_forkObject



150
151
152
153
154
155
156
157
# File 'lib/resque-multi-job-forks.rb', line 150

def release_fork
  log_with_severity :info, "jobs processed by child: #{jobs_processed}; rss: #{rss}"
  run_hook :before_child_exit, self
  Resque.after_fork, Resque.before_fork = *@suppressed_fork_hooks
  @release_fork_limit = @jobs_processed = nil
  log_with_severity :debug, 'hijack over, counter terrorists win.'
  @shutdown = true
end

#shutdown_childObject

Need to tell the child to shutdown since it might be looping performing multiple jobs per fork. The QUIT signal normally does a graceful shutdown, and is re-registered in children (term_child normally unregisters it).



118
119
120
121
122
123
124
125
126
# File 'lib/resque-multi-job-forks.rb', line 118

def shutdown_child
  return unless @child
  begin
    log_with_severity :debug, "multi_jobs_per_fork: Sending QUIT signal to #{@child}"
    Process.kill('QUIT', @child)
  rescue Errno::ESRCH
    nil
  end
end

#shutdown_with_multi_job_forksObject Also known as: shutdown



83
84
85
86
# File 'lib/resque-multi-job-forks.rb', line 83

def shutdown_with_multi_job_forks
  shutdown_child
  shutdown_without_multi_job_forks
end

#shutdown_with_multi_job_forks?Boolean Also known as: shutdown?

Returns:

  • (Boolean)


76
77
78
79
# File 'lib/resque-multi-job-forks.rb', line 76

def shutdown_with_multi_job_forks?
  release_fork if fork_hijacked? && (fork_job_limit_reached? || @shutdown)
  shutdown_without_multi_job_forks?
end

#work_with_multi_job_forks(*args) ⇒ Object Also known as: work



48
49
50
51
52
# File 'lib/resque-multi-job-forks.rb', line 48

def work_with_multi_job_forks(*args)
  pid # forces @pid to be set in the parent
  work_without_multi_job_forks(*args)
  release_and_exit! unless is_parent_process?
end

#working_on_with_worker_registration(job) ⇒ Object Also known as: working_on



97
98
99
100
# File 'lib/resque-multi-job-forks.rb', line 97

def working_on_with_worker_registration(job)
  register_worker
  working_on_without_worker_registration(job)
end