Class: ActiveJob::StructuredEventSubscriber
Overview
Constant Summary
ActiveSupport::StructuredEventSubscriber::DEBUG_CHECK
Instance Attribute Summary
#silenced_events
#patterns
Instance Method Summary
collapse
attach_to, #call, #emit_debug_event, #emit_event, #initialize, #silenced?
attach_to, #call, detach_from, #initialize, method_added, subscribers
Instance Method Details
#discard(event) ⇒ Object
137
138
139
140
141
142
143
144
145
146
147
|
# File 'activejob/lib/active_job/structured_event_subscriber.rb', line 137
def discard(event)
job = event.payload[:job]
exception = event.payload[:error]
emit_event("active_job.discarded",
job_class: job.class.name,
job_id: job.job_id,
exception_class: exception.class.name,
exception_message: exception.message
)
end
|
#enqueue(event) ⇒ Object
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
# File 'activejob/lib/active_job/structured_event_subscriber.rb', line 7
def enqueue(event)
job = event.payload[:job]
adapter = event.payload[:adapter]
exception = event.payload[:exception_object] || job.enqueue_error
payload = {
job_class: job.class.name,
job_id: job.job_id,
queue: job.queue_name,
adapter: ActiveJob.adapter_name(adapter),
aborted: event.payload[:aborted],
}
if exception
payload[:exception_class] = exception.class.name
payload[:exception_message] = exception.message
end
if job.class.log_arguments?
payload[:arguments] = job.arguments
end
emit_event("active_job.enqueued", payload)
end
|
#enqueue_all(event) ⇒ Object
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
|
# File 'activejob/lib/active_job/structured_event_subscriber.rb', line 56
def enqueue_all(event)
jobs = event.payload[:jobs]
adapter = event.payload[:adapter]
enqueued_count = event.payload[:enqueued_count].to_i
failed_count = jobs.size - enqueued_count
emit_event("active_job.bulk_enqueued",
adapter: ActiveJob.adapter_name(adapter),
job_count: jobs.size,
enqueued_count: enqueued_count,
failed_enqueue_count: failed_count,
enqueued_classes: jobs.filter_map do |job|
job.class.name if jobs.count == enqueued_count || job.successfully_enqueued?
end.tally
)
end
|
#enqueue_at(event) ⇒ Object
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
|
# File 'activejob/lib/active_job/structured_event_subscriber.rb', line 31
def enqueue_at(event)
job = event.payload[:job]
adapter = event.payload[:adapter]
exception = event.payload[:exception_object] || job.enqueue_error
payload = {
job_class: job.class.name,
job_id: job.job_id,
queue: job.queue_name,
scheduled_at: job.scheduled_at,
adapter: ActiveJob.adapter_name(adapter),
aborted: event.payload[:aborted],
}
if exception
payload[:exception_class] = exception.class.name
payload[:exception_message] = exception.message
end
if job.class.log_arguments?
payload[:arguments] = job.arguments
end
emit_event("active_job.enqueued_at", payload)
end
|
#enqueue_retry(event) ⇒ Object
109
110
111
112
113
114
115
116
117
118
119
120
121
122
|
# File 'activejob/lib/active_job/structured_event_subscriber.rb', line 109
def enqueue_retry(event)
job = event.payload[:job]
exception = event.payload[:error]
wait = event.payload[:wait]
emit_event("active_job.retry_scheduled",
job_class: job.class.name,
job_id: job.job_id,
executions: job.executions,
wait_seconds: wait.to_i,
exception_class: exception&.class&.name,
exception_message: exception&.message
)
end
|
#interrupt(event) ⇒ Object
149
150
151
152
153
154
155
156
157
158
159
160
|
# File 'activejob/lib/active_job/structured_event_subscriber.rb', line 149
def interrupt(event)
job = event.payload[:job]
description = event.payload[:description]
reason = event.payload[:reason]
emit_event("active_job.interrupt",
job_class: job.class.name,
job_id: job.job_id,
description: description,
reason: reason,
)
end
|
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
|
# File 'activejob/lib/active_job/structured_event_subscriber.rb', line 87
def perform(event)
job = event.payload[:job]
exception = event.payload[:exception_object]
adapter = event.payload[:adapter]
payload = {
job_class: job.class.name,
job_id: job.job_id,
queue: job.queue_name,
adapter: ActiveJob.adapter_name(adapter),
aborted: event.payload[:aborted],
duration: event.duration.round(2),
}
if exception
payload[:exception_class] = exception.class.name
payload[:exception_message] = exception.message
payload[:exception_backtrace] = exception.backtrace
end
emit_event("active_job.completed", payload)
end
|
73
74
75
76
77
78
79
80
81
82
83
84
85
|
# File 'activejob/lib/active_job/structured_event_subscriber.rb', line 73
def perform_start(event)
job = event.payload[:job]
payload = {
job_class: job.class.name,
job_id: job.job_id,
queue: job.queue_name,
enqueued_at: job.enqueued_at&.utc&.iso8601(9),
}
if job.class.log_arguments?
payload[:arguments] = job.arguments
end
emit_event("active_job.started", payload)
end
|
#resume(event) ⇒ Object
162
163
164
165
166
167
168
169
170
171
|
# File 'activejob/lib/active_job/structured_event_subscriber.rb', line 162
def resume(event)
job = event.payload[:job]
description = event.payload[:description]
emit_event("active_job.resume",
job_class: job.class.name,
job_id: job.job_id,
description: description,
)
end
|
#retry_stopped(event) ⇒ Object
124
125
126
127
128
129
130
131
132
133
134
135
|
# File 'activejob/lib/active_job/structured_event_subscriber.rb', line 124
def retry_stopped(event)
job = event.payload[:job]
exception = event.payload[:error]
emit_event("active_job.retry_stopped",
job_class: job.class.name,
job_id: job.job_id,
executions: job.executions,
exception_class: exception.class.name,
exception_message: exception.message
)
end
|
#step(event) ⇒ Object
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
|
# File 'activejob/lib/active_job/structured_event_subscriber.rb', line 197
def step(event)
job = event.payload[:job]
step = event.payload[:step]
exception = event.payload[:exception_object]
payload = {
job_class: job.class.name,
job_id: job.job_id,
step: step.name,
cursor: step.cursor,
interrupted: event.payload[:interrupted],
duration: event.duration.round(2),
}
if exception
payload[:exception_class] = exception.class.name
payload[:exception_message] = exception.message
end
emit_event("active_job.step", payload)
end
|
#step_skipped(event) ⇒ Object
173
174
175
176
177
178
179
180
181
182
|
# File 'activejob/lib/active_job/structured_event_subscriber.rb', line 173
def step_skipped(event)
job = event.payload[:job]
step = event.payload[:step]
emit_event("active_job.step_skipped",
job_class: job.class.name,
job_id: job.job_id,
step: step.name,
)
end
|
#step_started(event) ⇒ Object
184
185
186
187
188
189
190
191
192
193
194
195
|
# File 'activejob/lib/active_job/structured_event_subscriber.rb', line 184
def step_started(event)
job = event.payload[:job]
step = event.payload[:step]
emit_event("active_job.step_started",
job_class: job.class.name,
job_id: job.job_id,
step: step.name,
cursor: step.cursor,
resumed: step.resumed?,
)
end
|