Module: Fasten::Support::ForkWorker

Defined in:
lib/fasten/support/fork_worker.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#pidObject

Returns the value of attribute pid.



4
5
6
# File 'lib/fasten/support/fork_worker.rb', line 4

def pid
  @pid
end

Instance Method Details

#close_child_pipesObject



72
73
74
75
# File 'lib/fasten/support/fork_worker.rb', line 72

def close_child_pipes
  child_read.close unless child_read.closed?
  child_write.close unless child_write.closed?
end

#close_parent_pipesObject



67
68
69
70
# File 'lib/fasten/support/fork_worker.rb', line 67

def close_parent_pipes
  parent_read.close unless parent_read.closed?
  parent_write.close unless parent_write.closed?
end

#create_pipesObject



59
60
61
62
63
64
65
# File 'lib/fasten/support/fork_worker.rb', line 59

def create_pipes
  self.child_read, self.parent_write = IO.pipe binmode: true
  self.parent_read, self.child_write = IO.pipe binmode: true

  child_write.set_encoding 'binary'
  parent_write.set_encoding 'binary'
end

#killObject



18
19
20
21
22
23
24
25
26
# File 'lib/fasten/support/fork_worker.rb', line 18

def kill
  log_info 'Removing worker'
  Process.kill :KILL, pid
rescue StandardError => e
  log_warn "Ignoring error killing worker #{self}, error: #{e}"
ensure
  close_parent_pipes
  close_child_pipes
end

#receive_request_from_parentObject



48
49
50
# File 'lib/fasten/support/fork_worker.rb', line 48

def receive_request_from_parent
  Marshal.load(child_read) # rubocop:disable Security/MarshalLoad because pipe is a secure channel
end

#receive_response_from_childObject



37
38
39
40
41
42
43
44
45
46
# File 'lib/fasten/support/fork_worker.rb', line 37

def receive_response_from_child
  updated_task = Marshal.load(parent_read) # rubocop:disable Security/MarshalLoad because pipe is a secure channel

  %i[state ini fin dif response error].each { |key| running_task.send "#{key}=", updated_task.send(key) }

  task = running_task
  self.running_task = self.state = nil

  task
end

#redirect_std(path) ⇒ Object



77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/fasten/support/fork_worker.rb', line 77

def redirect_std(path)
  @saved_stdout_constant ||= $stdout.clone
  @saved_stderr_constant ||= $stderr.clone

  FileUtils.mkdir_p File.dirname(path)
  @redirect_log = File.new path, 'a'
  @redirect_log.sync = true

  $stdout.reopen @redirect_log
  $stderr.reopen @redirect_log

  logger.reopen(@redirect_log)
end

#restore_stdObject



91
92
93
94
95
96
# File 'lib/fasten/support/fork_worker.rb', line 91

def restore_std
  @redirect_log.close

  $stdout.reopen @saved_stdout_constant
  $stderr.reopen @saved_stderr_constant
end

#send_request_to_child(task) ⇒ Object



28
29
30
31
32
33
34
35
# File 'lib/fasten/support/fork_worker.rb', line 28

def send_request_to_child(task)
  task.state = :RUNNING
  task.worker = self
  self.running_task = task
  self.state = :RUNNING

  Marshal.dump(task, parent_write)
end

#send_response_to_parent(task) ⇒ Object



52
53
54
55
56
57
# File 'lib/fasten/support/fork_worker.rb', line 52

def send_response_to_parent(task)
  log_info "Sending task response back to runner #{task}"

  data = Marshal.dump(task)
  child_write.write(data)
end

#startObject



6
7
8
9
10
11
12
13
14
15
16
# File 'lib/fasten/support/fork_worker.rb', line 6

def start
  create_pipes

  self.pid = Process.fork do
    close_parent_pipes

    process_incoming_requests
  end

  close_child_pipes
end