Class: Resque::Worker
Constant Summary
collapse
- WorkerTerminated =
Class.new(StandardError)
Plugins::MultiJobForks::RssReader::LINUX, Plugins::MultiJobForks::RssReader::PS_CMD, Plugins::MultiJobForks::RssReader::UNITS
Instance Attribute Summary collapse
Class Method Summary
collapse
Instance Method Summary
collapse
#rss, #rss_linux, #rss_posix
Instance Attribute Details
#jobs_per_fork ⇒ Object
Returns the value of attribute jobs_per_fork.
9
10
11
|
# File 'lib/resque-multi-job-forks.rb', line 9
def jobs_per_fork
@jobs_per_fork
end
|
#jobs_processed ⇒ Object
Returns the value of attribute jobs_processed.
11
12
13
|
# File 'lib/resque-multi-job-forks.rb', line 11
def jobs_processed
@jobs_processed
end
|
#memory_threshold ⇒ Object
Returns the value of attribute memory_threshold.
10
11
12
|
# File 'lib/resque-multi-job-forks.rb', line 10
def memory_threshold
@memory_threshold
end
|
#seconds_per_fork ⇒ Object
Returns the value of attribute seconds_per_fork.
8
9
10
|
# File 'lib/resque-multi-job-forks.rb', line 8
def seconds_per_fork
@seconds_per_fork
end
|
Class Method Details
.multi_jobs_per_fork? ⇒ Boolean
15
16
17
|
# File 'lib/resque-multi-job-forks.rb', line 15
def self.multi_jobs_per_fork?
ENV["DISABLE_MULTI_JOBS_PER_FORK"].nil?
end
|
Instance Method Details
#fork(&block) ⇒ Object
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
|
# File 'lib/resque-multi-job-forks.rb', line 21
def fork(&block)
if child = Kernel.fork
return child
else
if term_child
unregister_signal_handlers
trap('TERM') do
trap('TERM') do
end
if @performing_job
raise TermException.new("SIGTERM")
else
shutdown
end
end
trap('QUIT') { shutdown }
end
raise NotImplementedError, "Pretending to not have forked"
end
end
|
#fork_hijacked? ⇒ Boolean
137
138
139
|
# File 'lib/resque-multi-job-forks.rb', line 137
def fork_hijacked?
@release_fork_limit
end
|
#fork_job_limit ⇒ Object
159
160
161
|
# File 'lib/resque-multi-job-forks.rb', line 159
def fork_job_limit
jobs_per_fork.nil? ? Time.now.to_f + seconds_per_fork : jobs_per_fork
end
|
#fork_job_limit_reached? ⇒ Boolean
163
164
165
|
# File 'lib/resque-multi-job-forks.rb', line 163
def fork_job_limit_reached?
fork_job_limit_remaining <= 0 || fork_job_over_memory_threshold?
end
|
#fork_job_limit_remaining ⇒ Object
167
168
169
|
# File 'lib/resque-multi-job-forks.rb', line 167
def fork_job_limit_remaining
jobs_per_fork.nil? ? @release_fork_limit - Time.now.to_f : jobs_per_fork - @jobs_processed
end
|
#fork_job_over_memory_threshold? ⇒ Boolean
183
184
185
|
# File 'lib/resque-multi-job-forks.rb', line 183
def fork_job_over_memory_threshold?
!!(memory_threshold && > memory_threshold)
end
|
#hijack_fork ⇒ Object
141
142
143
144
145
146
147
148
|
# File 'lib/resque-multi-job-forks.rb', line 141
def hijack_fork
log_with_severity :debug, 'hijack fork.'
@suppressed_fork_hooks = [Resque.after_fork, Resque.before_fork]
Resque.after_fork = Resque.before_fork = nil
@release_fork_limit = fork_job_limit
@jobs_processed = 0
@fork_per_job = false
end
|
#is_parent_process? ⇒ Boolean
128
129
130
|
# File 'lib/resque-multi-job-forks.rb', line 128
def is_parent_process?
@child || @pid == Process.pid
end
|
#minutes_per_fork ⇒ Object
175
176
177
|
# File 'lib/resque-multi-job-forks.rb', line 175
def minutes_per_fork
ENV['MINUTES_PER_FORK'].nil? ? 1 : ENV['MINUTES_PER_FORK'].to_i
end
|
#pause_processing_with_multi_job_forks ⇒ Object
Also known as:
pause_processing
90
91
92
93
|
# File 'lib/resque-multi-job-forks.rb', line 90
def pause_processing_with_multi_job_forks
shutdown_child
pause_processing_without_multi_job_forks
end
|
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
|
# File 'lib/resque-multi-job-forks.rb', line 56
def perform_with_multi_job_forks(job = nil)
@fork_per_job = true unless fork_hijacked? if shutdown?
if job
report_failed_job(job, WorkerTerminated.new("shutdown before job start"))
end
return
end
@performing_job = true
perform_without_multi_job_forks(job)
hijack_fork unless fork_hijacked?
@jobs_processed += 1
ensure
@performing_job = false
end
|
#reconnect_with_multi_job_forks ⇒ Object
Also known as:
reconnect
105
106
107
108
109
110
|
# File 'lib/resque-multi-job-forks.rb', line 105
def reconnect_with_multi_job_forks
unless @reconnected
reconnect_without_multi_job_forks
@reconnected = true
end
end
|
#release_and_exit! ⇒ Object
132
133
134
135
|
# File 'lib/resque-multi-job-forks.rb', line 132
def release_and_exit!
release_fork if fork_hijacked?
run_at_exit_hooks ? exit : exit!(true)
end
|
#release_fork ⇒ Object
150
151
152
153
154
155
156
157
|
# File 'lib/resque-multi-job-forks.rb', line 150
def release_fork
log_with_severity :info, "jobs processed by child: #{jobs_processed}; rss: #{}"
run_hook :before_child_exit, self
Resque.after_fork, Resque.before_fork = *@suppressed_fork_hooks
@release_fork_limit = @jobs_processed = nil
log_with_severity :debug, 'hijack over, counter terrorists win.'
@shutdown = true
end
|
#shutdown_child ⇒ Object
Need to tell the child to shutdown since it might be looping performing multiple jobs per fork. The QUIT signal normally does a graceful shutdown, and is re-registered in children (term_child normally unregisters it).
118
119
120
121
122
123
124
125
126
|
# File 'lib/resque-multi-job-forks.rb', line 118
def shutdown_child
return unless @child
begin
log_with_severity :debug, "multi_jobs_per_fork: Sending QUIT signal to #{@child}"
Process.kill('QUIT', @child)
rescue Errno::ESRCH
nil
end
end
|
#shutdown_with_multi_job_forks ⇒ Object
Also known as:
shutdown
83
84
85
86
|
# File 'lib/resque-multi-job-forks.rb', line 83
def shutdown_with_multi_job_forks
shutdown_child
shutdown_without_multi_job_forks
end
|
#shutdown_with_multi_job_forks? ⇒ Boolean
Also known as:
shutdown?
76
77
78
79
|
# File 'lib/resque-multi-job-forks.rb', line 76
def shutdown_with_multi_job_forks?
release_fork if fork_hijacked? && (fork_job_limit_reached? || @shutdown)
shutdown_without_multi_job_forks?
end
|
#work_with_multi_job_forks(*args) ⇒ Object
Also known as:
work
48
49
50
51
52
|
# File 'lib/resque-multi-job-forks.rb', line 48
def work_with_multi_job_forks(*args)
pid work_without_multi_job_forks(*args)
release_and_exit! unless is_parent_process?
end
|
#working_on_with_worker_registration(job) ⇒ Object
Also known as:
working_on
97
98
99
100
|
# File 'lib/resque-multi-job-forks.rb', line 97
def working_on_with_worker_registration(job)
register_worker
working_on_without_worker_registration(job)
end
|