Class: SolidQueue::LogSubscriber

Inherits:
ActiveSupport::LogSubscriber
  • Object
show all
Defined in:
lib/solid_queue/log_subscriber.rb

Instance Method Summary collapse

Instance Method Details

#deregister_process(event) ⇒ Object



92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/solid_queue/log_subscriber.rb', line 92

def deregister_process(event)
  process = event.payload[:process]

  attributes = {
    process_id: process.id,
    pid: process.pid,
    hostname: process.hostname,
    last_heartbeat_at: process.last_heartbeat_at,
    claimed_size: process.claimed_executions.size,
    pruned: event.payload
  }

  if error = event.payload[:error]
    warn formatted_event(event, action: "Error deregistering #{process.kind}", **attributes.merge(error: formatted_error(error)))
  else
    info formatted_event(event, action: "Deregister #{process.kind}", **attributes)
  end
end

#discard(event) ⇒ Object



30
31
32
# File 'lib/solid_queue/log_subscriber.rb', line 30

def discard(event)
  debug formatted_event(event, action: "Discard job", **event.payload.slice(:job_id, :status))
end

#discard_all(event) ⇒ Object



26
27
28
# File 'lib/solid_queue/log_subscriber.rb', line 26

def discard_all(event)
  debug formatted_event(event, action: "Discard jobs", **event.payload.slice(:jobs_size, :size, :status))
end

#dispatch_scheduled(event) ⇒ Object



6
7
8
# File 'lib/solid_queue/log_subscriber.rb', line 6

def dispatch_scheduled(event)
  debug formatted_event(event, action: "Dispatch scheduled jobs", **event.payload.slice(:batch_size, :size))
end

#enqueue_recurring_task(event) ⇒ Object



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/solid_queue/log_subscriber.rb', line 42

def enqueue_recurring_task(event)
  attributes = event.payload.slice(:task, :active_job_id, :enqueue_error, :at)

  if event.payload[:other_adapter]
    action = attributes[:active_job_id].present? ? "Enqueued recurring task outside Solid Queue" : "Error enqueuing recurring task"
    info formatted_event(event, action: action, **attributes)
  else
    action = case
    when event.payload[:skipped].present? then "Skipped recurring task – already dispatched"
    when attributes[:active_job_id].nil?  then "Error enqueuing recurring task"
    else                                       "Enqueued recurring task"
    end

    info formatted_event(event, action: action, **attributes)
  end
end

#graceful_termination(event) ⇒ Object



119
120
121
122
123
124
125
126
127
# File 'lib/solid_queue/log_subscriber.rb', line 119

def graceful_termination(event)
  attributes = event.payload.slice(:supervisor_pid, :supervised_pids)

  if event.payload[:shutdown_timeout_exceeded]
    warn formatted_event(event, action: "Supervisor wasn't terminated gracefully - shutdown timeout exceeded", **attributes)
  else
    info formatted_event(event, action: "Supervisor terminated gracefully", **attributes)
  end
end

#immediate_termination(event) ⇒ Object



129
130
131
# File 'lib/solid_queue/log_subscriber.rb', line 129

def immediate_termination(event)
  info formatted_event(event, action: "Supervisor terminated immediately", **event.payload.slice(:supervisor_pid, :supervised_pids))
end

#prune_processes(event) ⇒ Object



111
112
113
# File 'lib/solid_queue/log_subscriber.rb', line 111

def prune_processes(event)
  debug formatted_event(event, action: "Prune dead processes", **event.payload.slice(:size))
end

#register_process(event) ⇒ Object



81
82
83
84
85
86
87
88
89
90
# File 'lib/solid_queue/log_subscriber.rb', line 81

def register_process(event)
  process_kind = event.payload[:kind]
  attributes = event.payload.slice(:pid, :hostname)

  if error = event.payload[:error]
    warn formatted_event(event, action: "Error registering #{process_kind}", **attributes.merge(error: formatted_error(error)))
  else
    info formatted_event(event, action: "Register #{process_kind}", **attributes)
  end
end

#release_blocked(event) ⇒ Object



38
39
40
# File 'lib/solid_queue/log_subscriber.rb', line 38

def release_blocked(event)
  debug formatted_event(event, action: "Release blocked job", **event.payload.slice(:job_id, :concurrency_key, :released))
end

#release_claimed(event) ⇒ Object



14
15
16
# File 'lib/solid_queue/log_subscriber.rb', line 14

def release_claimed(event)
  debug formatted_event(event, action: "Release claimed job", **event.payload.slice(:job_id, :process_id))
end

#release_many_blocked(event) ⇒ Object



34
35
36
# File 'lib/solid_queue/log_subscriber.rb', line 34

def release_many_blocked(event)
  debug formatted_event(event, action: "Unblock jobs", **event.payload.slice(:limit, :size))
end

#release_many_claimed(event) ⇒ Object



10
11
12
# File 'lib/solid_queue/log_subscriber.rb', line 10

def release_many_claimed(event)
  debug formatted_event(event, action: "Release claimed jobs", **event.payload.slice(:size))
end

#replace_fork(event) ⇒ Object



137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# File 'lib/solid_queue/log_subscriber.rb', line 137

def replace_fork(event)
  status = event.payload[:status]
  attributes = event.payload.slice(:pid).merge \
    status: (status.exitstatus || "no exit status set"),
    pid_from_status: status.pid,
    signaled: status.signaled?,
    stopsig: status.stopsig,
    termsig: status.termsig

  if replaced_fork = event.payload[:fork]
    info formatted_event(event, action: "Replaced terminated #{replaced_fork.kind}", **attributes.merge(hostname: replaced_fork.hostname))
  else
    warn formatted_event(event, action: "Tried to replace forked process but it had already died", **attributes)
  end
end

#retry(event) ⇒ Object



22
23
24
# File 'lib/solid_queue/log_subscriber.rb', line 22

def retry(event)
  debug formatted_event(event, action: "Retry failed job", **event.payload.slice(:job_id))
end

#retry_all(event) ⇒ Object



18
19
20
# File 'lib/solid_queue/log_subscriber.rb', line 18

def retry_all(event)
  debug formatted_event(event, action: "Retry failed jobs", **event.payload.slice(:jobs_size, :size))
end

#shutdown_process(event) ⇒ Object



70
71
72
73
74
75
76
77
78
79
# File 'lib/solid_queue/log_subscriber.rb', line 70

def shutdown_process(event)
  process = event.payload[:process]

  attributes = {
    pid: process.pid,
    hostname: process.hostname
  }.merge(process.)

  info formatted_event(event, action: "Shut down #{process.kind}", **attributes)
end

#start_process(event) ⇒ Object



59
60
61
62
63
64
65
66
67
68
# File 'lib/solid_queue/log_subscriber.rb', line 59

def start_process(event)
  process = event.payload[:process]

  attributes = {
    pid: process.pid,
    hostname: process.hostname
  }.merge(process.)

  info formatted_event(event, action: "Started #{process.kind}", **attributes)
end

#thread_error(event) ⇒ Object



115
116
117
# File 'lib/solid_queue/log_subscriber.rb', line 115

def thread_error(event)
  error formatted_event(event, action: "Error in thread", error: formatted_error(event.payload[:error]))
end

#unhandled_signal_error(event) ⇒ Object



133
134
135
# File 'lib/solid_queue/log_subscriber.rb', line 133

def unhandled_signal_error(event)
  error formatted_event(event, action: "Received unhandled signal", **event.payload.slice(:signal))
end