Module: ActiveWorker::Behavior::ExecuteConcurrently

Included in:
Controller
Defined in:
lib/active_worker/behavior/execute_concurrently.rb

Instance Method Summary collapse

Instance Method Details

#after_fork(param) ⇒ Object



118
119
120
121
122
123
124
# File 'lib/active_worker/behavior/execute_concurrently.rb', line 118

def after_fork(param)
  self.role = FORKED
  cleanup_after_children
  set_process_name(param)
  reset_mongoid
  reset_resque
end

#cleanup_after_childrenObject



75
76
77
78
# File 'lib/active_worker/behavior/execute_concurrently.rb', line 75

def cleanup_after_children
  @pids = []
  @threads = []
end

#execute_concurrently(params) ⇒ Object



53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/active_worker/behavior/execute_concurrently.rb', line 53

def execute_concurrently(params)
  new_threads = params.map do |param|
    case local_worker_mode
      when THREADED_MODE
        execute_thread param
      when FORKING_MODE
        execute_fork param
    end
  end

  threads.concat new_threads
  new_threads
end

#execute_fork(param) ⇒ Object



96
97
98
99
100
101
102
# File 'lib/active_worker/behavior/execute_concurrently.rb', line 96

def execute_fork(param)
  pid = fork do
    in_fork(param)
  end
  pids << pid
  Process.detach(pid)
end

#execute_thread(param) ⇒ Object



90
91
92
93
94
# File 'lib/active_worker/behavior/execute_concurrently.rb', line 90

def execute_thread(param)
  Thread.new do
    in_thread(param)
  end
end

#forked?Boolean

Returns:

  • (Boolean)


41
42
43
# File 'lib/active_worker/behavior/execute_concurrently.rb', line 41

def forked?
  role == FORKED
end

#forking?Boolean

Returns:

  • (Boolean)


25
26
27
# File 'lib/active_worker/behavior/execute_concurrently.rb', line 25

def forking?
  local_worker_mode == FORKING_MODE
end

#in_fork(param) ⇒ Object



108
109
110
111
112
113
114
115
116
# File 'lib/active_worker/behavior/execute_concurrently.rb', line 108

def in_fork(param)
  after_fork(param)
  execute(param)
rescue SignalException
  self.handle_termination([param.to_param])
  exit
rescue Exception => e
  self.handle_error(e, :in_fork, [param.to_param])
end

#in_thread(param) ⇒ Object



104
105
106
# File 'lib/active_worker/behavior/execute_concurrently.rb', line 104

def in_thread(param)
  execute(param)
end

#kill_childrenObject



80
81
82
83
84
85
86
87
88
# File 'lib/active_worker/behavior/execute_concurrently.rb', line 80

def kill_children
  pids.each do |pid|
    begin
      Process.kill("TERM", pid) if pid
    rescue Errno::ESRCH
      puts "PID: #{pid} did not exist when we went to kill it"
    end
  end
end

#local_worker_modeObject



17
18
19
# File 'lib/active_worker/behavior/execute_concurrently.rb', line 17

def local_worker_mode
  @@local_worker_mode ||= FORKING_MODE
end

#local_worker_mode=(mode) ⇒ Object



13
14
15
# File 'lib/active_worker/behavior/execute_concurrently.rb', line 13

def local_worker_mode=(mode)
  @@local_worker_mode = mode
end

#parent?Boolean

Returns:

  • (Boolean)


37
38
39
# File 'lib/active_worker/behavior/execute_concurrently.rb', line 37

def parent?
  role == PARENT
end

#pidsObject



45
46
47
# File 'lib/active_worker/behavior/execute_concurrently.rb', line 45

def pids
  @pids ||= []
end

#reset_mongoidObject



130
131
132
# File 'lib/active_worker/behavior/execute_concurrently.rb', line 130

def reset_mongoid
  Mongoid::Sessions.clear
end

#reset_resqueObject



134
135
136
137
# File 'lib/active_worker/behavior/execute_concurrently.rb', line 134

def reset_resque
  Resque.redis.client.reconnect
  trap("TERM", "DEFAULT")
end

#roleObject



29
30
31
# File 'lib/active_worker/behavior/execute_concurrently.rb', line 29

def role
  @@role ||= PARENT
end

#role=(role) ⇒ Object



33
34
35
# File 'lib/active_worker/behavior/execute_concurrently.rb', line 33

def role=(role)
  @@role = role
end

#set_process_name(param) ⇒ Object



126
127
128
# File 'lib/active_worker/behavior/execute_concurrently.rb', line 126

def set_process_name(param)
  $0 = "ActiveWorker Forked from #{Process.ppid} for #{param}"
end

#threaded?Boolean

Returns:

  • (Boolean)


21
22
23
# File 'lib/active_worker/behavior/execute_concurrently.rb', line 21

def threaded?
  local_worker_mode == THREADED_MODE
end

#threadsObject



49
50
51
# File 'lib/active_worker/behavior/execute_concurrently.rb', line 49

def threads
  @threads ||= []
end

#wait_for_childrenObject



67
68
69
70
71
72
73
# File 'lib/active_worker/behavior/execute_concurrently.rb', line 67

def wait_for_children
  threads.each do |thread|
    thread.join if thread
  end

  cleanup_after_children
end