Class: SolidQueue::LogSubscriber
- Inherits:
-
ActiveSupport::LogSubscriber
- Object
- ActiveSupport::LogSubscriber
- SolidQueue::LogSubscriber
- Defined in:
- lib/solid_queue/log_subscriber.rb
Instance Method Summary collapse
- #claim(event) ⇒ Object
- #deregister_process(event) ⇒ Object
- #discard(event) ⇒ Object
- #discard_all(event) ⇒ Object
- #dispatch_scheduled(event) ⇒ Object
- #enqueue_recurring_task(event) ⇒ Object
- #fail_many_claimed(event) ⇒ Object
- #graceful_termination(event) ⇒ Object
- #immediate_termination(event) ⇒ Object
- #prune_processes(event) ⇒ Object
- #register_process(event) ⇒ Object
- #release_blocked(event) ⇒ Object
- #release_claimed(event) ⇒ Object
- #release_many_blocked(event) ⇒ Object
- #release_many_claimed(event) ⇒ Object
- #replace_fork(event) ⇒ Object
- #retry(event) ⇒ Object
- #retry_all(event) ⇒ Object
- #shutdown_process(event) ⇒ Object
- #start_process(event) ⇒ Object
- #thread_error(event) ⇒ Object
- #unhandled_signal_error(event) ⇒ Object
Instance Method Details
#claim(event) ⇒ Object
10 11 12 |
# File 'lib/solid_queue/log_subscriber.rb', line 10 def claim(event) debug formatted_event(event, action: "Claim jobs", **event.payload.slice(:process_id, :job_ids, :claimed_job_ids, :size)) end |
#deregister_process(event) ⇒ Object
101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 |
# File 'lib/solid_queue/log_subscriber.rb', line 101 def deregister_process(event) process = event.payload[:process] attributes = { process_id: process.id, pid: process.pid, hostname: process.hostname, name: process.name, last_heartbeat_at: process.last_heartbeat_at.iso8601, claimed_size: event.payload[:claimed_size], pruned: event.payload[:pruned] } if error = event.payload[:error] warn formatted_event(event, action: "Error deregistering #{process.kind}", **attributes.merge(error: formatted_error(error))) else debug formatted_event(event, action: "Deregister #{process.kind}", **attributes) end end |
#discard(event) ⇒ Object
38 39 40 |
# File 'lib/solid_queue/log_subscriber.rb', line 38 def discard(event) debug formatted_event(event, action: "Discard job", **event.payload.slice(:job_id, :status)) end |
#discard_all(event) ⇒ Object
34 35 36 |
# File 'lib/solid_queue/log_subscriber.rb', line 34 def discard_all(event) debug formatted_event(event, action: "Discard jobs", **event.payload.slice(:jobs_size, :size, :status)) end |
#dispatch_scheduled(event) ⇒ Object
6 7 8 |
# File 'lib/solid_queue/log_subscriber.rb', line 6 def dispatch_scheduled(event) debug formatted_event(event, action: "Dispatch scheduled jobs", **event.payload.slice(:batch_size, :size)) end |
#enqueue_recurring_task(event) ⇒ Object
50 51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/solid_queue/log_subscriber.rb', line 50 def enqueue_recurring_task(event) attributes = event.payload.slice(:task, :active_job_id, :enqueue_error) attributes[:at] = event.payload[:at]&.iso8601 if attributes[:active_job_id].nil? && event.payload[:skipped].nil? error formatted_event(event, action: "Error enqueuing recurring task", **attributes) elsif event.payload[:other_adapter] debug formatted_event(event, action: "Enqueued recurring task outside Solid Queue", **attributes) else action = event.payload[:skipped].present? ? "Skipped recurring task – already dispatched" : "Enqueued recurring task" debug formatted_event(event, action: action, **attributes) end end |
#fail_many_claimed(event) ⇒ Object
18 19 20 |
# File 'lib/solid_queue/log_subscriber.rb', line 18 def fail_many_claimed(event) warn formatted_event(event, action: "Fail claimed jobs", **event.payload.slice(:job_ids, :process_ids)) end |
#graceful_termination(event) ⇒ Object
129 130 131 132 133 134 135 136 137 |
# File 'lib/solid_queue/log_subscriber.rb', line 129 def graceful_termination(event) attributes = event.payload.slice(:process_id, :supervisor_pid, :supervised_processes) if event.payload[:shutdown_timeout_exceeded] warn formatted_event(event, action: "Supervisor wasn't terminated gracefully - shutdown timeout exceeded", **attributes) else info formatted_event(event, action: "Supervisor terminated gracefully", **attributes) end end |
#immediate_termination(event) ⇒ Object
139 140 141 |
# File 'lib/solid_queue/log_subscriber.rb', line 139 def immediate_termination(event) info formatted_event(event, action: "Supervisor terminated immediately", **event.payload.slice(:process_id, :supervisor_pid, :supervised_processes)) end |
#prune_processes(event) ⇒ Object
121 122 123 |
# File 'lib/solid_queue/log_subscriber.rb', line 121 def prune_processes(event) debug formatted_event(event, action: "Prune dead processes", **event.payload.slice(:size)) end |
#register_process(event) ⇒ Object
90 91 92 93 94 95 96 97 98 99 |
# File 'lib/solid_queue/log_subscriber.rb', line 90 def register_process(event) process_kind = event.payload[:kind] attributes = event.payload.slice(:pid, :hostname, :process_id, :name) if error = event.payload[:error] warn formatted_event(event, action: "Error registering #{process_kind}", **attributes.merge(error: formatted_error(error))) else debug formatted_event(event, action: "Register #{process_kind}", **attributes) end end |
#release_blocked(event) ⇒ Object
46 47 48 |
# File 'lib/solid_queue/log_subscriber.rb', line 46 def release_blocked(event) debug formatted_event(event, action: "Release blocked job", **event.payload.slice(:job_id, :concurrency_key, :released)) end |
#release_claimed(event) ⇒ Object
22 23 24 |
# File 'lib/solid_queue/log_subscriber.rb', line 22 def release_claimed(event) info formatted_event(event, action: "Release claimed job", **event.payload.slice(:job_id, :process_id)) end |
#release_many_blocked(event) ⇒ Object
42 43 44 |
# File 'lib/solid_queue/log_subscriber.rb', line 42 def release_many_blocked(event) debug formatted_event(event, action: "Unblock jobs", **event.payload.slice(:limit, :size)) end |
#release_many_claimed(event) ⇒ Object
14 15 16 |
# File 'lib/solid_queue/log_subscriber.rb', line 14 def release_many_claimed(event) info formatted_event(event, action: "Release claimed jobs", **event.payload.slice(:size)) end |
#replace_fork(event) ⇒ Object
147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 |
# File 'lib/solid_queue/log_subscriber.rb', line 147 def replace_fork(event) supervisor_pid = event.payload[:supervisor_pid] status = event.payload[:status] attributes = event.payload.slice(:pid).merge \ status: (status.exitstatus || "no exit status set"), pid_from_status: status.pid, signaled: status.signaled?, stopsig: status.stopsig, termsig: status.termsig if replaced_fork = event.payload[:fork] info formatted_event(event, action: "Replaced terminated #{replaced_fork.kind}", **attributes.merge(hostname: replaced_fork.hostname, name: replaced_fork.name)) elsif supervisor_pid != 1 # Running Docker, possibly having some processes that have been reparented warn formatted_event(event, action: "Tried to replace forked process but it had already died", **attributes) end end |
#retry(event) ⇒ Object
30 31 32 |
# File 'lib/solid_queue/log_subscriber.rb', line 30 def retry(event) debug formatted_event(event, action: "Retry failed job", **event.payload.slice(:job_id)) end |
#retry_all(event) ⇒ Object
26 27 28 |
# File 'lib/solid_queue/log_subscriber.rb', line 26 def retry_all(event) debug formatted_event(event, action: "Retry failed jobs", **event.payload.slice(:jobs_size, :size)) end |
#shutdown_process(event) ⇒ Object
77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/solid_queue/log_subscriber.rb', line 77 def shutdown_process(event) process = event.payload[:process] attributes = { pid: process.pid, hostname: process.hostname, process_id: process.process_id, name: process.name }.merge(process.) info formatted_event(event, action: "Shutdown #{process.kind}", **attributes) end |
#start_process(event) ⇒ Object
64 65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/solid_queue/log_subscriber.rb', line 64 def start_process(event) process = event.payload[:process] attributes = { pid: process.pid, hostname: process.hostname, process_id: process.process_id, name: process.name }.merge(process.) info formatted_event(event, action: "Started #{process.kind}", **attributes) end |
#thread_error(event) ⇒ Object
125 126 127 |
# File 'lib/solid_queue/log_subscriber.rb', line 125 def thread_error(event) error formatted_event(event, action: "Error in thread", error: formatted_error(event.payload[:error])) end |
#unhandled_signal_error(event) ⇒ Object
143 144 145 |
# File 'lib/solid_queue/log_subscriber.rb', line 143 def unhandled_signal_error(event) error formatted_event(event, action: "Received unhandled signal", **event.payload.slice(:signal)) end |