Module: ActiveWorker::Behavior::ExecuteConcurrently
- Included in:
- Controller
- Defined in:
- lib/active_worker/behavior/execute_concurrently.rb
Instance Method Summary collapse
- #after_fork(param) ⇒ Object
- #cleanup_after_children ⇒ Object
- #execute_concurrently(params) ⇒ Object
- #execute_fork(param) ⇒ Object
- #execute_thread(param) ⇒ Object
- #forked? ⇒ Boolean
- #forking? ⇒ Boolean
- #in_fork(param) ⇒ Object
- #in_thread(param) ⇒ Object
- #kill_children ⇒ Object
- #local_worker_mode ⇒ Object
- #local_worker_mode=(mode) ⇒ Object
- #parent? ⇒ Boolean
- #pids ⇒ Object
- #reset_mongoid ⇒ Object
- #reset_resque ⇒ Object
- #role ⇒ Object
- #role=(role) ⇒ Object
- #set_process_name(param) ⇒ Object
- #threaded? ⇒ Boolean
- #threads ⇒ Object
- #wait_for_children ⇒ Object
Instance Method Details
#after_fork(param) ⇒ Object
118 119 120 121 122 123 124 |
# File 'lib/active_worker/behavior/execute_concurrently.rb', line 118 def after_fork(param) self.role = FORKED cleanup_after_children set_process_name(param) reset_mongoid reset_resque end |
#cleanup_after_children ⇒ Object
75 76 77 78 |
# File 'lib/active_worker/behavior/execute_concurrently.rb', line 75 def cleanup_after_children @pids = [] @threads = [] end |
#execute_concurrently(params) ⇒ Object
53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/active_worker/behavior/execute_concurrently.rb', line 53 def execute_concurrently(params) new_threads = params.map do |param| case local_worker_mode when THREADED_MODE execute_thread param when FORKING_MODE execute_fork param end end threads.concat new_threads new_threads end |
#execute_fork(param) ⇒ Object
96 97 98 99 100 101 102 |
# File 'lib/active_worker/behavior/execute_concurrently.rb', line 96 def execute_fork(param) pid = fork do in_fork(param) end pids << pid Process.detach(pid) end |
#execute_thread(param) ⇒ Object
90 91 92 93 94 |
# File 'lib/active_worker/behavior/execute_concurrently.rb', line 90 def execute_thread(param) Thread.new do in_thread(param) end end |
#forked? ⇒ Boolean
41 42 43 |
# File 'lib/active_worker/behavior/execute_concurrently.rb', line 41 def forked? role == FORKED end |
#forking? ⇒ Boolean
25 26 27 |
# File 'lib/active_worker/behavior/execute_concurrently.rb', line 25 def forking? local_worker_mode == FORKING_MODE end |
#in_fork(param) ⇒ Object
108 109 110 111 112 113 114 115 116 |
# File 'lib/active_worker/behavior/execute_concurrently.rb', line 108 def in_fork(param) after_fork(param) execute(param) rescue SignalException self.handle_termination([param.to_param]) exit rescue Exception => e self.handle_error(e, :in_fork, [param.to_param]) end |
#in_thread(param) ⇒ Object
104 105 106 |
# File 'lib/active_worker/behavior/execute_concurrently.rb', line 104 def in_thread(param) execute(param) end |
#kill_children ⇒ Object
80 81 82 83 84 85 86 87 88 |
# File 'lib/active_worker/behavior/execute_concurrently.rb', line 80 def kill_children pids.each do |pid| begin Process.kill("TERM", pid) if pid rescue Errno::ESRCH puts "PID: #{pid} did not exist when we went to kill it" end end end |
#local_worker_mode ⇒ Object
17 18 19 |
# File 'lib/active_worker/behavior/execute_concurrently.rb', line 17 def local_worker_mode @@local_worker_mode ||= FORKING_MODE end |
#local_worker_mode=(mode) ⇒ Object
13 14 15 |
# File 'lib/active_worker/behavior/execute_concurrently.rb', line 13 def local_worker_mode=(mode) @@local_worker_mode = mode end |
#parent? ⇒ Boolean
37 38 39 |
# File 'lib/active_worker/behavior/execute_concurrently.rb', line 37 def parent? role == PARENT end |
#pids ⇒ Object
45 46 47 |
# File 'lib/active_worker/behavior/execute_concurrently.rb', line 45 def pids @pids ||= [] end |
#reset_mongoid ⇒ Object
130 131 132 |
# File 'lib/active_worker/behavior/execute_concurrently.rb', line 130 def reset_mongoid Mongoid::Sessions.clear end |
#reset_resque ⇒ Object
134 135 136 137 |
# File 'lib/active_worker/behavior/execute_concurrently.rb', line 134 def reset_resque Resque.redis.client.reconnect trap("TERM", "DEFAULT") end |
#role ⇒ Object
29 30 31 |
# File 'lib/active_worker/behavior/execute_concurrently.rb', line 29 def role @@role ||= PARENT end |
#role=(role) ⇒ Object
33 34 35 |
# File 'lib/active_worker/behavior/execute_concurrently.rb', line 33 def role=(role) @@role = role end |
#set_process_name(param) ⇒ Object
126 127 128 |
# File 'lib/active_worker/behavior/execute_concurrently.rb', line 126 def set_process_name(param) $0 = "ActiveWorker Forked from #{Process.ppid} for #{param}" end |
#threaded? ⇒ Boolean
21 22 23 |
# File 'lib/active_worker/behavior/execute_concurrently.rb', line 21 def threaded? local_worker_mode == THREADED_MODE end |
#threads ⇒ Object
49 50 51 |
# File 'lib/active_worker/behavior/execute_concurrently.rb', line 49 def threads @threads ||= [] end |
#wait_for_children ⇒ Object
67 68 69 70 71 72 73 |
# File 'lib/active_worker/behavior/execute_concurrently.rb', line 67 def wait_for_children threads.each do |thread| thread.join if thread end cleanup_after_children end |