Class: GoodJob::Process

Inherits:
BaseRecord
  • Object
show all
Includes:
AdvisoryLockable, OverridableConnection
Defined in:
app/models/good_job/process.rb

Overview

Active Record model that represents a GoodJob capsule/process (either async or CLI).

Constant Summary collapse

STALE_INTERVAL =

Interval until the process record being updated

30.seconds
EXPIRED_INTERVAL =

Interval until the process record is treated as expired

5.minutes
PROCESS_MEMORY =
case RUBY_PLATFORM
when /linux/
  lambda do |pid|
    File.readlines("/proc/#{pid}/smaps_rollup").each do |line|
      next unless line.start_with?('Pss:')

      break line.split[1].to_i
    end
  rescue Errno::ENOENT
    File.readlines("/proc/#{pid}/status").each do |line|
      next unless line.start_with?('VmRSS:')

      break line.split[1].to_i
    end
  end
when /darwin|bsd/
  lambda do |pid|
    `ps -o pid,rss -p #{pid.to_i}`.lines.last.split.last.to_i
  end
else
  ->(_pid) { 0 }
end

Constants included from AdvisoryLockable

AdvisoryLockable::RecordAlreadyAdvisoryLockedError

Class Method Summary collapse

Instance Method Summary collapse

Methods included from AdvisoryLockable

#advisory_lock, #advisory_lock!, #advisory_locked?, #advisory_unlock, #advisory_unlock!, #advisory_unlocked?, #lockable_column_key, #lockable_key, #owns_advisory_lock?, #with_advisory_lock

Methods inherited from BaseRecord

bind_value, migrated?, migration_pending_warning!, with_logger_silenced

Class Method Details

.activeActiveRecord::Relation

Processes that are inactive and unlocked (e.g. SIGKILLed)

Returns:

  • (ActiveRecord::Relation)


57
58
59
60
61
# File 'app/models/good_job/process.rb', line 57

scope :active, (lambda do
  query = joins_advisory_locks
  query.where(lock_type: :advisory).advisory_locked
    .or(query.where(lock_type: nil).where(arel_table[:updated_at].gt(EXPIRED_INTERVAL.ago)))
end)

.cleanupObject

Deletes all inactive process records.



74
75
76
77
78
79
# File 'app/models/good_job/process.rb', line 74

def self.cleanup
  inactive.find_each do |process|
    GoodJob::Job.where(locked_by_id: process.id).update_all(locked_by_id: nil, locked_at: nil) # rubocop:disable Rails/SkipsModelValidations
    process.delete
  end
end

.find_or_create_record(id:, with_advisory_lock: false) ⇒ Object



88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'app/models/good_job/process.rb', line 88

def self.find_or_create_record(id:, with_advisory_lock: false)
  attributes = {
    id: id,
    state: process_state,
  }
  if with_advisory_lock
    attributes[:create_with_advisory_lock] = true
    attributes[:lock_type] = :advisory
  end
  create!(attributes)
rescue ActiveRecord::RecordNotUnique
  find_by(id: id).tap do |existing_record|
    next unless existing_record

    if with_advisory_lock
      existing_record.advisory_lock!
      existing_record.update(lock_type: :advisory, state: process_state, updated_at: Time.current)
    else
      existing_record.update(lock_type: nil, state: process_state, updated_at: Time.current)
    end
  end
end

.memory_usage(pid) ⇒ Integer

Returns:

  • (Integer)


82
83
84
85
86
# File 'app/models/good_job/process.rb', line 82

def self.memory_usage(pid)
  PROCESS_MEMORY.call(pid)
rescue StandardError
  0
end

.process_stateObject



111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# File 'app/models/good_job/process.rb', line 111

def self.process_state
  {
    hostname: Socket.gethostname,
    pid: ::Process.pid,
    memory: memory_usage(::Process.pid),
    proctitle: $PROGRAM_NAME,
    preserve_job_records: GoodJob.preserve_job_records,
    retry_on_unhandled_error: GoodJob.retry_on_unhandled_error,
    schedulers: GoodJob::Scheduler.instances.map(&:stats),
    cron_enabled: GoodJob.configuration.enable_cron?,
    total_succeeded_executions_count: GoodJob::Scheduler.instances.sum { |scheduler| scheduler.stats.fetch(:succeeded_executions_count) },
    total_errored_executions_count: GoodJob::Scheduler.instances.sum { |scheduler| scheduler.stats.fetch(:errored_executions_count) },
    database_connection_pool: {
      size: connection_pool.size,
      active: connection_pool.connections.count(&:in_use?),
    },
  }
end

Instance Method Details

#basenameObject



161
162
163
# File 'app/models/good_job/process.rb', line 161

def basename
  File.basename(state.fetch("proctitle", ""))
end

#expired?Boolean

Returns:

  • (Boolean)


157
158
159
# File 'app/models/good_job/process.rb', line 157

def expired?
  updated_at < EXPIRED_INTERVAL.ago
end

#refreshObject



130
131
132
133
134
135
136
137
138
139
# File 'app/models/good_job/process.rb', line 130

def refresh
  reload # verify the record still exists in the database
  self.state = self.class.process_state
  update(state: state, updated_at: Time.current)
rescue ActiveRecord::RecordNotFound
  @new_record = true
  self.created_at = self.updated_at = nil
  state_will_change!
  save
end

#refresh_if_stale(cleanup: false) ⇒ Object



141
142
143
144
145
146
147
# File 'app/models/good_job/process.rb', line 141

def refresh_if_stale(cleanup: false)
  return unless stale?

  result = refresh
  self.class.cleanup if cleanup
  result
end

#schedulersObject



165
166
167
# File 'app/models/good_job/process.rb', line 165

def schedulers
  state.fetch("schedulers", [])
end

#stale?Boolean

Returns:

  • (Boolean)


153
154
155
# File 'app/models/good_job/process.rb', line 153

def stale?
  updated_at < STALE_INTERVAL.ago
end

#stateObject



149
150
151
# File 'app/models/good_job/process.rb', line 149

def state
  super || {}
end