Class: SolidQueue::LogSubscriber
- Inherits:
-
ActiveSupport::LogSubscriber
- Object
- ActiveSupport::LogSubscriber
- SolidQueue::LogSubscriber
- Defined in:
- lib/solid_queue/log_subscriber.rb
Instance Method Summary collapse
- #deregister_process(event) ⇒ Object
- #discard(event) ⇒ Object
- #discard_all(event) ⇒ Object
- #dispatch_scheduled(event) ⇒ Object
- #enqueue_recurring_task(event) ⇒ Object
- #graceful_termination(event) ⇒ Object
- #immediate_termination(event) ⇒ Object
- #prune_processes(event) ⇒ Object
- #register_process(event) ⇒ Object
- #release_blocked(event) ⇒ Object
- #release_claimed(event) ⇒ Object
- #release_many_blocked(event) ⇒ Object
- #release_many_claimed(event) ⇒ Object
- #replace_fork(event) ⇒ Object
- #retry(event) ⇒ Object
- #retry_all(event) ⇒ Object
- #shutdown_process(event) ⇒ Object
- #start_process(event) ⇒ Object
- #thread_error(event) ⇒ Object
- #unhandled_signal_error(event) ⇒ Object
Instance Method Details
#deregister_process(event) ⇒ Object
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/solid_queue/log_subscriber.rb', line 92 def deregister_process(event) process = event.payload[:process] attributes = { process_id: process.id, pid: process.pid, hostname: process.hostname, last_heartbeat_at: process.last_heartbeat_at, claimed_size: process.claimed_executions.size, pruned: event.payload } if error = event.payload[:error] warn formatted_event(event, action: "Error deregistering #{process.kind}", **attributes.merge(error: formatted_error(error))) else info formatted_event(event, action: "Deregister #{process.kind}", **attributes) end end |
#discard(event) ⇒ Object
30 31 32 |
# File 'lib/solid_queue/log_subscriber.rb', line 30 def discard(event) debug formatted_event(event, action: "Discard job", **event.payload.slice(:job_id, :status)) end |
#discard_all(event) ⇒ Object
26 27 28 |
# File 'lib/solid_queue/log_subscriber.rb', line 26 def discard_all(event) debug formatted_event(event, action: "Discard jobs", **event.payload.slice(:jobs_size, :size, :status)) end |
#dispatch_scheduled(event) ⇒ Object
6 7 8 |
# File 'lib/solid_queue/log_subscriber.rb', line 6 def dispatch_scheduled(event) debug formatted_event(event, action: "Dispatch scheduled jobs", **event.payload.slice(:batch_size, :size)) end |
#enqueue_recurring_task(event) ⇒ Object
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/solid_queue/log_subscriber.rb', line 42 def enqueue_recurring_task(event) attributes = event.payload.slice(:task, :active_job_id, :enqueue_error, :at) if event.payload[:other_adapter] action = attributes[:active_job_id].present? ? "Enqueued recurring task outside Solid Queue" : "Error enqueuing recurring task" info formatted_event(event, action: action, **attributes) else action = case when event.payload[:skipped].present? then "Skipped recurring task – already dispatched" when attributes[:active_job_id].nil? then "Error enqueuing recurring task" else "Enqueued recurring task" end info formatted_event(event, action: action, **attributes) end end |
#graceful_termination(event) ⇒ Object
119 120 121 122 123 124 125 126 127 |
# File 'lib/solid_queue/log_subscriber.rb', line 119 def graceful_termination(event) attributes = event.payload.slice(:supervisor_pid, :supervised_pids) if event.payload[:shutdown_timeout_exceeded] warn formatted_event(event, action: "Supervisor wasn't terminated gracefully - shutdown timeout exceeded", **attributes) else info formatted_event(event, action: "Supervisor terminated gracefully", **attributes) end end |
#immediate_termination(event) ⇒ Object
129 130 131 |
# File 'lib/solid_queue/log_subscriber.rb', line 129 def immediate_termination(event) info formatted_event(event, action: "Supervisor terminated immediately", **event.payload.slice(:supervisor_pid, :supervised_pids)) end |
#prune_processes(event) ⇒ Object
111 112 113 |
# File 'lib/solid_queue/log_subscriber.rb', line 111 def prune_processes(event) debug formatted_event(event, action: "Prune dead processes", **event.payload.slice(:size)) end |
#register_process(event) ⇒ Object
81 82 83 84 85 86 87 88 89 90 |
# File 'lib/solid_queue/log_subscriber.rb', line 81 def register_process(event) process_kind = event.payload[:kind] attributes = event.payload.slice(:pid, :hostname) if error = event.payload[:error] warn formatted_event(event, action: "Error registering #{process_kind}", **attributes.merge(error: formatted_error(error))) else info formatted_event(event, action: "Register #{process_kind}", **attributes) end end |
#release_blocked(event) ⇒ Object
38 39 40 |
# File 'lib/solid_queue/log_subscriber.rb', line 38 def release_blocked(event) debug formatted_event(event, action: "Release blocked job", **event.payload.slice(:job_id, :concurrency_key, :released)) end |
#release_claimed(event) ⇒ Object
14 15 16 |
# File 'lib/solid_queue/log_subscriber.rb', line 14 def release_claimed(event) debug formatted_event(event, action: "Release claimed job", **event.payload.slice(:job_id, :process_id)) end |
#release_many_blocked(event) ⇒ Object
34 35 36 |
# File 'lib/solid_queue/log_subscriber.rb', line 34 def release_many_blocked(event) debug formatted_event(event, action: "Unblock jobs", **event.payload.slice(:limit, :size)) end |
#release_many_claimed(event) ⇒ Object
10 11 12 |
# File 'lib/solid_queue/log_subscriber.rb', line 10 def release_many_claimed(event) debug formatted_event(event, action: "Release claimed jobs", **event.payload.slice(:size)) end |
#replace_fork(event) ⇒ Object
137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 |
# File 'lib/solid_queue/log_subscriber.rb', line 137 def replace_fork(event) status = event.payload[:status] attributes = event.payload.slice(:pid).merge \ status: (status.exitstatus || "no exit status set"), pid_from_status: status.pid, signaled: status.signaled?, stopsig: status.stopsig, termsig: status.termsig if replaced_fork = event.payload[:fork] info formatted_event(event, action: "Replaced terminated #{replaced_fork.kind}", **attributes.merge(hostname: replaced_fork.hostname)) else warn formatted_event(event, action: "Tried to replace forked process but it had already died", **attributes) end end |
#retry(event) ⇒ Object
22 23 24 |
# File 'lib/solid_queue/log_subscriber.rb', line 22 def retry(event) debug formatted_event(event, action: "Retry failed job", **event.payload.slice(:job_id)) end |
#retry_all(event) ⇒ Object
18 19 20 |
# File 'lib/solid_queue/log_subscriber.rb', line 18 def retry_all(event) debug formatted_event(event, action: "Retry failed jobs", **event.payload.slice(:jobs_size, :size)) end |
#shutdown_process(event) ⇒ Object
70 71 72 73 74 75 76 77 78 79 |
# File 'lib/solid_queue/log_subscriber.rb', line 70 def shutdown_process(event) process = event.payload[:process] attributes = { pid: process.pid, hostname: process.hostname }.merge(process.) info formatted_event(event, action: "Shut down #{process.kind}", **attributes) end |
#start_process(event) ⇒ Object
59 60 61 62 63 64 65 66 67 68 |
# File 'lib/solid_queue/log_subscriber.rb', line 59 def start_process(event) process = event.payload[:process] attributes = { pid: process.pid, hostname: process.hostname }.merge(process.) info formatted_event(event, action: "Started #{process.kind}", **attributes) end |
#thread_error(event) ⇒ Object
115 116 117 |
# File 'lib/solid_queue/log_subscriber.rb', line 115 def thread_error(event) error formatted_event(event, action: "Error in thread", error: formatted_error(event.payload[:error])) end |
#unhandled_signal_error(event) ⇒ Object
133 134 135 |
# File 'lib/solid_queue/log_subscriber.rb', line 133 def unhandled_signal_error(event) error formatted_event(event, action: "Received unhandled signal", **event.payload.slice(:signal)) end |