Class: Pitchfork::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/pitchfork/worker.rb

Overview

This class and its members can be considered a stable interface and will not change in a backwards-incompatible fashion between releases of pitchfork. Knowledge of this class is generally not needed for most users of pitchfork.

Some users may want to access it in the after_worker_fork/after_mold_fork hooks. See the Pitchfork::Configurator RDoc for examples.

Direct Known Subclasses

Service

Constant Summary collapse

EXIT_SIGNALS =

:stopdoc:

[:QUIT, :TERM]

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(nr, pid: nil, generation: 0) ⇒ Worker

Returns a new instance of Worker.



19
20
21
22
23
24
25
26
27
28
# File 'lib/pitchfork/worker.rb', line 19

def initialize(nr, pid: nil, generation: 0)
  @nr = nr
  @pid = pid
  @generation = generation
  @mold = false
  @to_io = @master = nil
  @exiting = false
  @requests_count = 0
  init_deadline
end

Instance Attribute Details

#generationObject

Returns the value of attribute generation.



16
17
18
# File 'lib/pitchfork/worker.rb', line 16

def generation
  @generation
end

#masterObject

Returns the value of attribute master.



17
18
19
# File 'lib/pitchfork/worker.rb', line 17

def master
  @master
end

#nrObject

Returns the value of attribute nr.



16
17
18
# File 'lib/pitchfork/worker.rb', line 16

def nr
  @nr
end

#pidObject

Returns the value of attribute pid.



16
17
18
# File 'lib/pitchfork/worker.rb', line 16

def pid
  @pid
end

#requests_countObject (readonly)

Returns the value of attribute requests_count.



17
18
19
# File 'lib/pitchfork/worker.rb', line 17

def requests_count
  @requests_count
end

Instance Method Details

#==(other) ⇒ Object

worker objects may be compared to just plain Integers



170
171
172
# File 'lib/pitchfork/worker.rb', line 170

def ==(other) # :nodoc:
  super || (!@nr.nil? && @nr == other)
end

#accept_nonblock(exception: nil) ⇒ Object

this only runs when the Rack app.call is not running act like a listener



147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
# File 'lib/pitchfork/worker.rb', line 147

def accept_nonblock(exception: nil) # :nodoc:
  loop do
    case buf = @to_io.recvmsg_nonblock(exception: false)
    when :wait_readable # keep waiting
      return false
    when nil # EOF master died, but we are at a safe place to exit
      fake_sig(:QUIT)
      return false
    when Message::SoftKill
      # trigger the signal handler
      fake_sig(buf.signum)
      # keep looping, more signals may be queued
    when Message
      return buf
    else
      raise TypeError, "Unexpected recvmsg_nonblock returns: #{buf.inspect}"
    end
  end # loop, as multiple signals may be sent
rescue Errno::ECONNRESET
  nil
end

#after_fork_in_childObject



207
208
209
# File 'lib/pitchfork/worker.rb', line 207

def after_fork_in_child
  @master&.close
end

#closeObject

called in both the master (reaping worker) and worker (SIGQUIT handler)



197
198
199
200
201
# File 'lib/pitchfork/worker.rb', line 197

def close # :nodoc:
  self.deadline = 0
  @master.close if @master
  @to_io.close if @to_io
end

#create_socketpair!Object



203
204
205
# File 'lib/pitchfork/worker.rb', line 203

def create_socketpair!
  @to_io, @master = Info.keep_ios(Pitchfork.socketpair)
end

#deadlineObject

called in the master process



184
185
186
# File 'lib/pitchfork/worker.rb', line 184

def deadline # :nodoc:
  @deadline_drop.value
end

#deadline=(value) ⇒ Object

called in the worker process



179
180
181
# File 'lib/pitchfork/worker.rb', line 179

def deadline=(value) # :nodoc:
  @deadline_drop.value = value
end

#exiting?Boolean

Returns:

  • (Boolean)


30
31
32
# File 'lib/pitchfork/worker.rb', line 30

def exiting?
  @exiting
end

#fake_sig(sig) ⇒ Object

call a signal handler immediately without triggering EINTR We do not use the more obvious Process.kill(sig, $$) here since that signal delivery may be deferred. We want to avoid signal delivery while the Rack app.call is running because some database drivers (e.g. ruby-pg) may cancel pending requests.



122
123
124
125
126
127
# File 'lib/pitchfork/worker.rb', line 122

def fake_sig(sig) # :nodoc:
  old_cb = trap(sig, "IGNORE")
  old_cb.call
ensure
  trap(sig, old_cb)
end

#finish_promotion(control_socket) ⇒ Object



69
70
71
72
73
74
# File 'lib/pitchfork/worker.rb', line 69

def finish_promotion(control_socket)
  message = Message::MoldReady.new(@nr, @pid, generation)
  control_socket.sendmsg(message)
  SharedMemory.current_generation = @generation
  @deadline_drop = SharedMemory.mold_deadline
end

#hard_kill(sig) ⇒ Object



141
142
143
# File 'lib/pitchfork/worker.rb', line 141

def hard_kill(sig)
  Process.kill(sig, pid)
end

#increment_requests_count(by = 1) ⇒ Object



192
193
194
# File 'lib/pitchfork/worker.rb', line 192

def increment_requests_count(by = 1)
  @requests_count += by
end

#mold?Boolean

Returns:

  • (Boolean)


101
102
103
# File 'lib/pitchfork/worker.rb', line 101

def mold?
  @mold
end

#outdated?Boolean

Returns:

  • (Boolean)


38
39
40
# File 'lib/pitchfork/worker.rb', line 38

def outdated?
  SharedMemory.current_generation > @generation
end

#pending?Boolean

Returns:

  • (Boolean)


34
35
36
# File 'lib/pitchfork/worker.rb', line 34

def pending?
  @master.nil?
end

#promote(generation) ⇒ Object



76
77
78
# File 'lib/pitchfork/worker.rb', line 76

def promote(generation)
  send_message_nonblock(Message::PromoteWorker.new(generation))
end

#promote!(timeout) ⇒ Object



88
89
90
91
# File 'lib/pitchfork/worker.rb', line 88

def promote!(timeout)
  @generation += 1
  promoted!(timeout)
end

#promoted!(timeout) ⇒ Object



93
94
95
96
97
98
99
# File 'lib/pitchfork/worker.rb', line 93

def promoted!(timeout)
  @mold = true
  @nr = nil
  @deadline_drop = SharedMemory.mold_promotion_deadline
  update_deadline(timeout) if timeout
  self
end

#register_to_master(control_socket) ⇒ Object



55
56
57
58
59
60
# File 'lib/pitchfork/worker.rb', line 55

def register_to_master(control_socket)
  create_socketpair!
  message = Message::WorkerSpawned.new(@nr, @pid, generation, @master)
  control_socket.sendmsg(message)
  @master.close
end

#resetObject



188
189
190
# File 'lib/pitchfork/worker.rb', line 188

def reset
  @requests_count = 0
end

#service?Boolean

Returns:

  • (Boolean)


105
106
107
# File 'lib/pitchfork/worker.rb', line 105

def service?
  false
end

#soft_kill(sig) ⇒ Object

master sends fake signals to children



130
131
132
133
134
135
136
137
138
139
# File 'lib/pitchfork/worker.rb', line 130

def soft_kill(sig) # :nodoc:
  signum = Signal.list[sig.to_s] or raise ArgumentError, "BUG: bad signal: #{sig.inspect}"

  # Do not care in the odd case the buffer is full, here.
  success = send_message_nonblock(Message::SoftKill.new(signum))
  if success && EXIT_SIGNALS.include?(sig)
    @exiting = true
  end
  success
end

#spawn_service(_new_service) ⇒ Object



84
85
86
# File 'lib/pitchfork/worker.rb', line 84

def spawn_service(_new_service)
  send_message_nonblock(Message::SpawnService.new)
end

#spawn_worker(new_worker) ⇒ Object



80
81
82
# File 'lib/pitchfork/worker.rb', line 80

def spawn_worker(new_worker)
  send_message_nonblock(Message::SpawnWorker.new(new_worker.nr))
end

#start_promotion(control_socket) ⇒ Object



62
63
64
65
66
67
# File 'lib/pitchfork/worker.rb', line 62

def start_promotion(control_socket)
  create_socketpair!
  message = Message::MoldSpawned.new(@nr, @pid, generation, @master)
  control_socket.sendmsg(message)
  @master.close
end

#to_ioObject

IO.select-compatible



109
110
111
# File 'lib/pitchfork/worker.rb', line 109

def to_io # IO.select-compatible
  @to_io.to_io
end

#update(message) ⇒ Object



42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/pitchfork/worker.rb', line 42

def update(message)
  message.class.members.each do |member|
    send("#{member}=", message.public_send(member))
  end

  case message
  when Message::MoldSpawned
    @deadline_drop = SharedMemory.mold_promotion_deadline
  when Message::MoldReady
    @deadline_drop = SharedMemory.mold_deadline
  end
end

#update_deadline(timeout) ⇒ Object



174
175
176
# File 'lib/pitchfork/worker.rb', line 174

def update_deadline(timeout)
  self.deadline = Pitchfork.time_now(true) + timeout
end