Class: Pitchfork::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/pitchfork/worker.rb

Overview

This class and its members can be considered a stable interface and will not change in a backwards-incompatible fashion between releases of pitchfork. Knowledge of this class is generally not not needed for most users of pitchfork.

Some users may want to access it in the after_worker_fork/after_mold_fork hooks. See the Pitchfork::Configurator RDoc for examples.

Constant Summary collapse

EXIT_SIGNALS =

:stopdoc:

[:QUIT, :TERM]

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(nr, pid: nil, generation: 0) ⇒ Worker

Returns a new instance of Worker.



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/pitchfork/worker.rb', line 18

def initialize(nr, pid: nil, generation: 0)
  @nr = nr
  @pid = pid
  @generation = generation
  @mold = false
  @to_io = @master = nil
  @exiting = false
  @requests_count = 0
  if nr
    @deadline_drop = SharedMemory.worker_deadline(nr)
    self.deadline = 0
  else
    promoted!(nil)
  end
end

Instance Attribute Details

#generationObject

Returns the value of attribute generation.



15
16
17
# File 'lib/pitchfork/worker.rb', line 15

def generation
  @generation
end

#masterObject

Returns the value of attribute master.



16
17
18
# File 'lib/pitchfork/worker.rb', line 16

def master
  @master
end

#nrObject

Returns the value of attribute nr.



15
16
17
# File 'lib/pitchfork/worker.rb', line 15

def nr
  @nr
end

#pidObject

Returns the value of attribute pid.



15
16
17
# File 'lib/pitchfork/worker.rb', line 15

def pid
  @pid
end

#requests_countObject (readonly)

Returns the value of attribute requests_count.



16
17
18
# File 'lib/pitchfork/worker.rb', line 16

def requests_count
  @requests_count
end

Instance Method Details

#==(other) ⇒ Object

worker objects may be compared to just plain Integers



179
180
181
# File 'lib/pitchfork/worker.rb', line 179

def ==(other) # :nodoc:
  super || (!@nr.nil? && @nr == other)
end

#accept_nonblock(exception: nil) ⇒ Object

this only runs when the Rack app.call is not running act like a listener



156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/pitchfork/worker.rb', line 156

def accept_nonblock(exception: nil) # :nodoc:
  loop do
    case buf = @to_io.recvmsg_nonblock(exception: false)
    when :wait_readable # keep waiting
      return false
    when nil # EOF master died, but we are at a safe place to exit
      fake_sig(:QUIT)
      return false
    when Message::SoftKill
      # trigger the signal handler
      fake_sig(buf.signum)
      # keep looping, more signals may be queued
    when Message
      return buf
    else
      raise TypeError, "Unexpected recvmsg_nonblock returns: #{buf.inspect}"
    end
  end # loop, as multiple signals may be sent
rescue Errno::ECONNRESET
  nil
end

#after_fork_in_childObject



216
217
218
# File 'lib/pitchfork/worker.rb', line 216

def after_fork_in_child
  @master&.close
end

#closeObject

called in both the master (reaping worker) and worker (SIGQUIT handler)



206
207
208
209
210
# File 'lib/pitchfork/worker.rb', line 206

def close # :nodoc:
  self.deadline = 0
  @master.close if @master
  @to_io.close if @to_io
end

#create_socketpair!Object



212
213
214
# File 'lib/pitchfork/worker.rb', line 212

def create_socketpair!
  @to_io, @master = Info.keep_ios(Pitchfork.socketpair)
end

#deadlineObject

called in the master process



193
194
195
# File 'lib/pitchfork/worker.rb', line 193

def deadline # :nodoc:
  @deadline_drop.value
end

#deadline=(value) ⇒ Object

called in the worker process



188
189
190
# File 'lib/pitchfork/worker.rb', line 188

def deadline=(value) # :nodoc:
  @deadline_drop.value = value
end

#exiting?Boolean

Returns:

  • (Boolean)


42
43
44
# File 'lib/pitchfork/worker.rb', line 42

def exiting?
  @exiting
end

#fake_sig(sig) ⇒ Object

call a signal handler immediately without triggering EINTR We do not use the more obvious Process.kill(sig, $$) here since that signal delivery may be deferred. We want to avoid signal delivery while the Rack app.call is running because some database drivers (e.g. ruby-pg) may cancel pending requests.



131
132
133
134
135
136
# File 'lib/pitchfork/worker.rb', line 131

def fake_sig(sig) # :nodoc:
  old_cb = trap(sig, "IGNORE")
  old_cb.call
ensure
  trap(sig, old_cb)
end

#finish_promotion(control_socket) ⇒ Object



81
82
83
84
85
86
# File 'lib/pitchfork/worker.rb', line 81

def finish_promotion(control_socket)
  message = Message::MoldReady.new(@nr, @pid, generation)
  control_socket.sendmsg(message)
  SharedMemory.current_generation = @generation
  @deadline_drop = SharedMemory.mold_deadline
end

#hard_kill(sig) ⇒ Object



150
151
152
# File 'lib/pitchfork/worker.rb', line 150

def hard_kill(sig)
  Process.kill(sig, pid)
end

#increment_requests_count(by = 1) ⇒ Object



201
202
203
# File 'lib/pitchfork/worker.rb', line 201

def increment_requests_count(by = 1)
  @requests_count += by
end

#meminfoObject



34
35
36
# File 'lib/pitchfork/worker.rb', line 34

def meminfo
  @meminfo ||= MemInfo.new(pid) if pid
end

#mold?Boolean

Returns:

  • (Boolean)


109
110
111
# File 'lib/pitchfork/worker.rb', line 109

def mold?
  @mold
end

#outdated?Boolean

Returns:

  • (Boolean)


50
51
52
# File 'lib/pitchfork/worker.rb', line 50

def outdated?
  SharedMemory.current_generation > @generation
end

#pending?Boolean

Returns:

  • (Boolean)


46
47
48
# File 'lib/pitchfork/worker.rb', line 46

def pending?
  @master.nil?
end

#promote(generation) ⇒ Object



88
89
90
# File 'lib/pitchfork/worker.rb', line 88

def promote(generation)
  send_message_nonblock(Message::PromoteWorker.new(generation))
end

#promote!(timeout) ⇒ Object



96
97
98
99
# File 'lib/pitchfork/worker.rb', line 96

def promote!(timeout)
  @generation += 1
  promoted!(timeout)
end

#promoted!(timeout) ⇒ Object



101
102
103
104
105
106
107
# File 'lib/pitchfork/worker.rb', line 101

def promoted!(timeout)
  @mold = true
  @nr = nil
  @deadline_drop = SharedMemory.mold_promotion_deadline
  update_deadline(timeout) if timeout
  self
end

#quitObject

master fakes SIGQUIT using this



118
119
120
# File 'lib/pitchfork/worker.rb', line 118

def quit # :nodoc:
  @master = @master.close if @master
end

#refreshObject



38
39
40
# File 'lib/pitchfork/worker.rb', line 38

def refresh
  meminfo&.update
end

#register_to_master(control_socket) ⇒ Object



67
68
69
70
71
72
# File 'lib/pitchfork/worker.rb', line 67

def register_to_master(control_socket)
  create_socketpair!
  message = Message::WorkerSpawned.new(@nr, @pid, generation, @master)
  control_socket.sendmsg(message)
  @master.close
end

#resetObject



197
198
199
# File 'lib/pitchfork/worker.rb', line 197

def reset
  @requests_count = 0
end

#soft_kill(sig) ⇒ Object

master sends fake signals to children



139
140
141
142
143
144
145
146
147
148
# File 'lib/pitchfork/worker.rb', line 139

def soft_kill(sig) # :nodoc:
  signum = Signal.list[sig.to_s] or raise ArgumentError, "BUG: bad signal: #{sig.inspect}"

  # Do not care in the odd case the buffer is full, here.
  success = send_message_nonblock(Message::SoftKill.new(signum))
  if success && EXIT_SIGNALS.include?(sig)
    @exiting = true
  end
  success
end

#spawn_worker(new_worker) ⇒ Object



92
93
94
# File 'lib/pitchfork/worker.rb', line 92

def spawn_worker(new_worker)
  send_message_nonblock(Message::SpawnWorker.new(new_worker.nr))
end

#start_promotion(control_socket) ⇒ Object



74
75
76
77
78
79
# File 'lib/pitchfork/worker.rb', line 74

def start_promotion(control_socket)
  create_socketpair!
  message = Message::MoldSpawned.new(@nr, @pid, generation, @master)
  control_socket.sendmsg(message)
  @master.close
end

#to_ioObject

IO.select-compatible



113
114
115
# File 'lib/pitchfork/worker.rb', line 113

def to_io # IO.select-compatible
  @to_io.to_io
end

#update(message) ⇒ Object



54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/pitchfork/worker.rb', line 54

def update(message)
  message.class.members.each do |member|
    send("#{member}=", message.public_send(member))
  end

  case message
  when Message::MoldSpawned
    @deadline_drop = SharedMemory.mold_promotion_deadline
  when Message::MoldReady
    @deadline_drop = SharedMemory.mold_deadline
  end
end

#update_deadline(timeout) ⇒ Object



183
184
185
# File 'lib/pitchfork/worker.rb', line 183

def update_deadline(timeout)
  self.deadline = Pitchfork.time_now(true) + timeout
end