Class: Delayed::WorkQueue::ParentProcess::Server
- Inherits:
-
Object
- Object
- Delayed::WorkQueue::ParentProcess::Server
- Includes:
- Logging
- Defined in:
- lib/delayed/work_queue/parent_process/server.rb
Defined Under Namespace
Classes: ClientState
Constant Summary collapse
- SIGNALS =
%i[INT TERM QUIT].freeze
Instance Attribute Summary collapse
-
#clients ⇒ Object
readonly
Returns the value of attribute clients.
-
#listen_socket ⇒ Object
readonly
Returns the value of attribute listen_socket.
Instance Method Summary collapse
- #all_workers_idle? ⇒ Boolean
- #check_for_work(forced_latency: nil) ⇒ Object
- #client_timeout(&block) ⇒ Object
- #connected_clients ⇒ Object
- #drop_socket(socket) ⇒ Object
- #exit? ⇒ Boolean
-
#handle_accept ⇒ Object
Any error on the listen socket other than WaitReadable will bubble up and terminate the work queue process, to be restarted by the parent daemon.
- #handle_read(socket) ⇒ Object
- #handle_request(socket) ⇒ Object
-
#initialize(listen_socket, parent_pid: nil, config: Settings.parent_process) ⇒ Server
constructor
A new instance of Server.
- #parent_exited? ⇒ Boolean
- #prefetch_owner ⇒ Object
-
#run ⇒ Object
run the server queue worker this method does not return, only exits or raises an exception.
- #run_once ⇒ Object
- #unlock_all_prefetched_jobs ⇒ Object
- #unlock_prefetched_jobs ⇒ Object
- #unlock_timed_out_prefetched_jobs ⇒ Object
Methods included from Logging
Constructor Details
#initialize(listen_socket, parent_pid: nil, config: Settings.parent_process) ⇒ Server
Returns a new instance of Server.
14 15 16 17 18 19 20 21 22 23 24 25 26 |
# File 'lib/delayed/work_queue/parent_process/server.rb', line 14 def initialize(listen_socket, parent_pid: nil, config: Settings.parent_process) @listen_socket = listen_socket @parent_pid = parent_pid @clients = {} @waiting_clients = {} @prefetched_jobs = {} @config = config @client_timeout = config["server_socket_timeout"] || 10.0 # left for backwards compat @exit = false @self_pipe = IO.pipe end |
Instance Attribute Details
#clients ⇒ Object (readonly)
Returns the value of attribute clients.
9 10 11 |
# File 'lib/delayed/work_queue/parent_process/server.rb', line 9 def clients @clients end |
#listen_socket ⇒ Object (readonly)
Returns the value of attribute listen_socket.
9 10 11 |
# File 'lib/delayed/work_queue/parent_process/server.rb', line 9 def listen_socket @listen_socket end |
Instance Method Details
#all_workers_idle? ⇒ Boolean
32 33 34 |
# File 'lib/delayed/work_queue/parent_process/server.rb', line 32 def all_workers_idle? @clients.none? { |_, c| c.working } end |
#check_for_work(forced_latency: nil) ⇒ Object
124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 |
# File 'lib/delayed/work_queue/parent_process/server.rb', line 124 def check_for_work(forced_latency: nil) @waiting_clients.each do |(worker_config, workers)| prefetched_jobs = @prefetched_jobs[worker_config] ||= [] logger.debug("I have #{prefetched_jobs.length} jobs for #{workers.length} waiting workers") while !prefetched_jobs.empty? && !workers.empty? job = prefetched_jobs.shift client = workers.shift # couldn't re-lock it for some reason logger.debug("Transferring prefetched job to #{client.name}") unless job.transfer_lock!(from: prefetch_owner, to: client.name) workers.unshift(client) next end client.working = true begin logger.debug("Sending prefetched job #{job.id} to #{client.name}") client_timeout { Marshal.dump(job, client.socket) } rescue SystemCallError, IOError, Timeout::Error => e logger.error("Failed to send pre-fetched job to #{client.name}: #{e.inspect}") drop_socket(client.socket) Delayed::Job.unlock([job]) end end next if workers.empty? logger.debug("Fetching new work for #{workers.length} workers") jobs_to_send = [] Delayed::Worker.lifecycle.run_callbacks(:work_queue_pop, self, worker_config) do recipients = workers.map(&:name) response = Delayed::Job.get_and_lock_next_available( recipients, worker_config[:queue], worker_config[:min_priority], worker_config[:max_priority], prefetch: (Settings.fetch_batch_size * (worker_config[:workers] || 1)) - recipients.length, prefetch_owner: prefetch_owner, forced_latency: forced_latency ) logger.debug( "Fetched and locked #{response.values.flatten.size} new jobs for workers (#{response.keys.join(", ")})." ) response.each do |(worker_name, locked_jobs)| if worker_name == prefetch_owner # it's actually an array of all the extra jobs logger.debug( "Adding prefetched jobs #{locked_jobs.length} to prefetched array (size: #{prefetched_jobs.count})" ) prefetched_jobs.concat(locked_jobs) next end client = workers.find { |worker| worker.name == worker_name } client.working = true jobs_to_send << [client, locked_jobs] end end jobs_to_send.each do |(recipient, job_to_send)| @waiting_clients[worker_config].delete(recipient) begin logger.debug("Sending job #{job_to_send.id} to #{recipient.name}") client_timeout { Marshal.dump(job_to_send, recipient.socket) } rescue SystemCallError, IOError, Timeout::Error => e logger.error("Failed to send job to #{recipient.name}: #{e.inspect}") drop_socket(recipient.socket) Delayed::Job.unlock([job_to_send]) end end end end |
#client_timeout(&block) ⇒ Object
277 278 279 |
# File 'lib/delayed/work_queue/parent_process/server.rb', line 277 def client_timeout(&block) Timeout.timeout(@client_timeout, &block) end |
#connected_clients ⇒ Object
28 29 30 |
# File 'lib/delayed/work_queue/parent_process/server.rb', line 28 def connected_clients @clients.size end |
#drop_socket(socket) ⇒ Object
251 252 253 254 255 256 257 258 259 260 261 262 263 |
# File 'lib/delayed/work_queue/parent_process/server.rb', line 251 def drop_socket(socket) # this socket went away begin socket.close rescue IOError nil end client = @clients[socket] @clients.delete(socket) @waiting_clients.each do |(_config, workers)| workers.delete(client) end end |
#exit? ⇒ Boolean
265 266 267 |
# File 'lib/delayed/work_queue/parent_process/server.rb', line 265 def exit? !!@exit || parent_exited? end |
#handle_accept ⇒ Object
Any error on the listen socket other than WaitReadable will bubble up and terminate the work queue process, to be restarted by the parent daemon.
97 98 99 100 101 102 103 |
# File 'lib/delayed/work_queue/parent_process/server.rb', line 97 def handle_accept socket, _addr = @listen_socket.accept_nonblock @clients[socket] = ClientState.new(false, socket) if socket rescue IO::WaitReadable logger.error("Server attempted to read listen_socket but failed with IO::WaitReadable") # ignore and just try accepting again next time through the loop end |
#handle_read(socket) ⇒ Object
83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/delayed/work_queue/parent_process/server.rb', line 83 def handle_read(socket) if socket == @listen_socket handle_accept elsif socket == @self_pipe[0] # We really don't care about the contents of the pipe, we just need to # wake up. @self_pipe[0].read_nonblock(11, exception: false) else handle_request(socket) end end |
#handle_request(socket) ⇒ Object
105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 |
# File 'lib/delayed/work_queue/parent_process/server.rb', line 105 def handle_request(socket) # There is an assumption here that the client will never send a partial # request and then leave the socket open. Doing so would leave us hanging # in Marshal.load forever. This is only a reasonable assumption because we # control the client. client = @clients[socket] if socket.eof? logger.debug("Client #{client.name} closed connection") return drop_socket(socket) end worker_name, worker_config = Marshal.load(socket) client.name = worker_name client.working = false (@waiting_clients[worker_config] ||= []) << client rescue SystemCallError, IOError => e logger.error("Receiving message from client (#{socket}) failed: #{e.inspect}") drop_socket(socket) end |
#parent_exited? ⇒ Boolean
273 274 275 |
# File 'lib/delayed/work_queue/parent_process/server.rb', line 273 def parent_exited? @parent_pid && @parent_pid != Process.ppid end |
#prefetch_owner ⇒ Object
269 270 271 |
# File 'lib/delayed/work_queue/parent_process/server.rb', line 269 def prefetch_owner "prefetch:#{Socket.gethostname rescue "X"}" end |
#run ⇒ Object
run the server queue worker this method does not return, only exits or raises an exception
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/delayed/work_queue/parent_process/server.rb', line 38 def run logger.debug "Starting work queue process" SIGNALS.each do |sig| # We're not doing any aggressive exiting here since we really want # prefetched jobs to be unlocked and we're going to wake up the process # from the IO.select we're using to wait on clients. trap(sig) do @exit = true @self_pipe[1].write_nonblock(".", exception: false) end end last_orphaned_prefetched_jobs_purge = Job.db_time_now - rand(15 * 60) until exit? run_once if last_orphaned_prefetched_jobs_purge + (15 * 60) < Job.db_time_now Job.unlock_orphaned_prefetched_jobs last_orphaned_prefetched_jobs_purge = Job.db_time_now end end rescue => e logger.error "WorkQueue Server died: #{e.inspect}" raise ensure unlock_all_prefetched_jobs end |
#run_once ⇒ Object
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/delayed/work_queue/parent_process/server.rb', line 66 def run_once handles = @clients.keys + [@listen_socket, @self_pipe[0]] # if we're currently idle, then force a "latency" to job fetching - don't # fetch recently queued jobs, allowing busier workers to fetch them first. # if they're not keeping up, the jobs will slip back in time, and suddenly we'll become # active and quickly pick up all the jobs we can. The latency is calculated to ensure that # an active worker is guaranteed to have attempted to fetch new jobs in the meantime forced_latency = Settings.sleep_delay + (Settings.sleep_delay_stagger * 2) if all_workers_idle? timeout = Settings.sleep_delay + (rand * Settings.sleep_delay_stagger) readable, = IO.select(handles, nil, nil, timeout) readable&.each { |s| handle_read(s) } Delayed::Worker.lifecycle.run_callbacks(:check_for_work, self) do check_for_work(forced_latency: forced_latency) end unlock_timed_out_prefetched_jobs end |
#unlock_all_prefetched_jobs ⇒ Object
243 244 245 246 247 248 249 |
# File 'lib/delayed/work_queue/parent_process/server.rb', line 243 def unlock_all_prefetched_jobs # we try really hard; it may not have done any work if it timed out 10.times do unlock_prefetched_jobs break if @prefetched_jobs.each_value.all?(&:empty?) end end |
#unlock_prefetched_jobs ⇒ Object
197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 |
# File 'lib/delayed/work_queue/parent_process/server.rb', line 197 def unlock_prefetched_jobs @prefetched_jobs.each do |(worker_config, jobs)| next if jobs.empty? next if block_given? && !yield(jobs) connection = Delayed::Job.connection connection.transaction do # make absolutely sure we don't get hung up and leave things # locked in the database if connection.postgresql_version >= 9_06_00 # rubocop:disable Style/NumericLiterals connection.idle_in_transaction_session_timeout = 5 end # relatively short timeout for acquiring the lock connection.statement_timeout = Settings.sleep_delay Delayed::Job.advisory_lock(Delayed::Job.prefetch_jobs_lock_name) # this query might take longer, and we really want to get it # done if we got the lock, but still don't want an inadvertent # hang connection.statement_timeout = 30 Delayed::Job.unlock(jobs) @prefetched_jobs[worker_config] = [] end rescue ActiveRecord::QueryCanceled # ignore; we'll retry anyway logger.warn("unable to unlock prefetched jobs; skipping for now") rescue ActiveRecord::StatementInvalid # see if we dropped the connection raise if connection.active? # otherwise just reconnect and let it retry logger.warn("failed to unlock prefetched jobs - connection terminated; skipping for now") if Rails.version < "6.1" ::Delayed::Job.clear_all_connections! else ::Delayed::Job.clear_all_connections!(nil) end end end |
#unlock_timed_out_prefetched_jobs ⇒ Object
237 238 239 240 241 |
# File 'lib/delayed/work_queue/parent_process/server.rb', line 237 def unlock_timed_out_prefetched_jobs unlock_prefetched_jobs do |jobs| jobs.first.locked_at < Time.now.utc - Settings.parent_process[:prefetched_jobs_timeout] end end |