Class: GoodJob::Process
Overview
Active Record model that represents a GoodJob capsule/process (either async or CLI).
Constant Summary
collapse
- STALE_INTERVAL =
Interval until the process record being updated
30.seconds
- EXPIRED_INTERVAL =
Interval until the process record is treated as expired
5.minutes
- PROCESS_MEMORY =
case RUBY_PLATFORM
when /linux/
lambda do |pid|
File.readlines("/proc/#{pid}/smaps_rollup").each do |line|
next unless line.start_with?('Pss:')
break line.split[1].to_i
end
rescue Errno::ENOENT
File.readlines("/proc/#{pid}/status").each do |line|
next unless line.start_with?('VmRSS:')
break line.split[1].to_i
end
end
when /darwin|bsd/
lambda do |pid|
`ps -o pid,rss -p #{pid.to_i}`.lines.last.split.last.to_i
end
else
->(_pid) { 0 }
end
AdvisoryLockable::RecordAlreadyAdvisoryLockedError
Class Method Summary
collapse
Instance Method Summary
collapse
#advisory_lock, #advisory_lock!, #advisory_locked?, #advisory_unlock, #advisory_unlock!, #advisory_unlocked?, #lockable_column_key, #lockable_key, #owns_advisory_lock?, #with_advisory_lock
Methods inherited from BaseRecord
bind_value, migrated?, migration_pending_warning!, with_logger_silenced
Class Method Details
.active ⇒ ActiveRecord::Relation
Processes that are inactive and unlocked (e.g. SIGKILLed)
57
58
59
60
61
|
# File 'app/models/good_job/process.rb', line 57
scope :active, (lambda do
query = joins_advisory_locks
query.where(lock_type: :advisory).advisory_locked
.or(query.where(lock_type: nil).where(arel_table[:updated_at].gt(EXPIRED_INTERVAL.ago)))
end)
|
.cleanup ⇒ Object
Deletes all inactive process records.
74
75
76
77
78
79
|
# File 'app/models/good_job/process.rb', line 74
def self.cleanup
inactive.find_each do |process|
GoodJob::Job.where(locked_by_id: process.id).update_all(locked_by_id: nil, locked_at: nil) process.delete
end
end
|
.find_or_create_record(id:, with_advisory_lock: false) ⇒ Object
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
|
# File 'app/models/good_job/process.rb', line 88
def self.find_or_create_record(id:, with_advisory_lock: false)
attributes = {
id: id,
state: process_state,
}
if with_advisory_lock
attributes[:create_with_advisory_lock] = true
attributes[:lock_type] = :advisory
end
create!(attributes)
rescue ActiveRecord::RecordNotUnique
find_by(id: id).tap do |existing_record|
next unless existing_record
if with_advisory_lock
existing_record.advisory_lock!
existing_record.update(lock_type: :advisory, state: process_state, updated_at: Time.current)
else
existing_record.update(lock_type: nil, state: process_state, updated_at: Time.current)
end
end
end
|
.memory_usage(pid) ⇒ Integer
82
83
84
85
86
|
# File 'app/models/good_job/process.rb', line 82
def self.memory_usage(pid)
PROCESS_MEMORY.call(pid)
rescue StandardError
0
end
|
.process_state ⇒ Object
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
|
# File 'app/models/good_job/process.rb', line 111
def self.process_state
{
hostname: Socket.gethostname,
pid: ::Process.pid,
memory: memory_usage(::Process.pid),
proctitle: $PROGRAM_NAME,
preserve_job_records: GoodJob.preserve_job_records,
retry_on_unhandled_error: GoodJob.retry_on_unhandled_error,
schedulers: GoodJob::Scheduler.instances.map(&:stats),
cron_enabled: GoodJob.configuration.enable_cron?,
total_succeeded_executions_count: GoodJob::Scheduler.instances.sum { |scheduler| scheduler.stats.fetch(:succeeded_executions_count) },
total_errored_executions_count: GoodJob::Scheduler.instances.sum { |scheduler| scheduler.stats.fetch(:errored_executions_count) },
database_connection_pool: {
size: connection_pool.size,
active: connection_pool.connections.count(&:in_use?),
},
}
end
|
Instance Method Details
#basename ⇒ Object
161
162
163
|
# File 'app/models/good_job/process.rb', line 161
def basename
File.basename(state.fetch("proctitle", ""))
end
|
#expired? ⇒ Boolean
157
158
159
|
# File 'app/models/good_job/process.rb', line 157
def expired?
updated_at < EXPIRED_INTERVAL.ago
end
|
#refresh ⇒ Object
130
131
132
133
134
135
136
137
138
139
|
# File 'app/models/good_job/process.rb', line 130
def refresh
reload self.state = self.class.process_state
update(state: state, updated_at: Time.current)
rescue ActiveRecord::RecordNotFound
@new_record = true
self.created_at = self.updated_at = nil
state_will_change!
save
end
|
#refresh_if_stale(cleanup: false) ⇒ Object
141
142
143
144
145
146
147
|
# File 'app/models/good_job/process.rb', line 141
def refresh_if_stale(cleanup: false)
return unless stale?
result = refresh
self.class.cleanup if cleanup
result
end
|
#schedulers ⇒ Object
165
166
167
|
# File 'app/models/good_job/process.rb', line 165
def schedulers
state.fetch("schedulers", [])
end
|
#stale? ⇒ Boolean
153
154
155
|
# File 'app/models/good_job/process.rb', line 153
def stale?
updated_at < STALE_INTERVAL.ago
end
|
#state ⇒ Object
149
150
151
|
# File 'app/models/good_job/process.rb', line 149
def state
super || {}
end
|