Module: Arj::Persistence

Included in:
Test::Job
Defined in:
lib/arj/persistence.rb

Overview

ActiveRecord-style persistence methods (save!, update!, exists?, etc.), but for jobs.

Example usage

class SampleJob < ActiveJob::Base
  include Arj::Persistence
end

job = SampleJob.set(queue_name: 'some queue').perform_later('some arg')
job.queue_name = 'other queue'
job.save!

The Arj module provides the same methods but with a job argument.

Example usage:

class SampleJob < ActiveJob::Base; end

job = SampleJob.set(queue_name: 'some queue').perform_later('some arg')
job.queue_name = 'other queue'
Arj.save!(job)

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.enqueue(job, timestamp = nil) ⇒ ActiveJob::Base

Enqueues a job, optionally at the specified time.

Parameters:

  • job (ActiveJob::Base)

    the job to enqueue

  • timestamp (Numeric, NilClass) (defaults to: nil)

    optional number of seconds since Unix epoch at which to execute the job

Returns:

  • (ActiveJob::Base)

    the enqueued job



49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/arj/persistence.rb', line 49

def enqueue(job, timestamp = nil)
  job.scheduled_at = timestamp ? Time.zone.at(timestamp) : nil

  if job.provider_job_id
    record = Arj.record_class.find(job.provider_job_id)
    record.update!(Persistence.record_attributes(job))
  else
    record = Arj.record_class.create!(Persistence.record_attributes(job))
  end
  Persistence.from_record(record, job)

  job
end

.from_record(record, job = nil) ⇒ ActiveJob::Base

Returns a job object for the specified database record. If a job is specified, it is updated from the record.

Returns:

  • (ActiveJob::Base)


66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/arj/persistence.rb', line 66

def from_record(record, job = nil)
  raise "expected #{Arj.record_class}, found #{record.class}" unless record.is_a?(Arj.record_class)
  raise "expected #{record.job_class}, found #{job.class}" if job && job.class.name != record.job_class

  job ||= Object.const_get(record.job_class).new
  raise "expected ActiveJob::Base, found #{job.class}" unless job.is_a?(ActiveJob::Base)

  if job.provider_job_id && job.provider_job_id != record.id
    raise ArgumentError, "unexpected id for #{job.class}: #{record.id} vs. #{job.provider_job_id}"
  end

  job.successfully_enqueued = true
  job_data = job_data(record)

  # ActiveJob deserializes arguments on demand when a job is performed. Until then they are empty. That's strange.
  job.arguments = ActiveJob::Arguments.deserialize(job_data['arguments'])
  job.deserialize(job_data)
  job.singleton_class.prepend(Arj::Job) unless job.singleton_class < Arj::Job

  job
end

.job_data(record) ⇒ Hash

Returns serialized job data for the specified database record.

Returns:

  • (Hash)


91
92
93
94
95
96
97
98
99
100
# File 'lib/arj/persistence.rb', line 91

def job_data(record)
  record.attributes.fetch_values(*REQUIRED_RECORD_ATTRIBUTES)
  job_data = record.attributes
  job_data['arguments'] = JSON.parse(job_data['arguments'])
  job_data['provider_job_id'] = record.id
  job_data['exception_executions'] = JSON.parse(job_data['exception_executions'])
  job_data['enqueued_at'] = job_data['enqueued_at'].iso8601
  job_data['scheduled_at'] = job_data['scheduled_at']&.iso8601 if job_data['scheduled_at']
  job_data
end

.record_attributes(job) ⇒ Hash

Returns database record attributes for the specified job.

Returns:

  • (Hash)


105
106
107
108
109
110
111
112
# File 'lib/arj/persistence.rb', line 105

def record_attributes(job)
  serialized = job.serialize
  serialized.fetch_values(*REQUIRED_JOB_ATTRIBUTES)
  serialized.delete('provider_job_id')
  serialized['arguments'] = serialized['arguments'].to_json
  serialized['exception_executions'] = serialized['exception_executions'].to_json
  serialized.symbolize_keys
end

Instance Method Details

#destroy!ActiveJob::Base

Destroys the database record associated with this job.

Returns:

  • (ActiveJob::Base)

    the specified job



144
145
146
# File 'lib/arj/persistence.rb', line 144

def destroy!
  Arj.destroy!(self)
end

#destroyed?Boolean

Returns true if the this job has been deleted from the database.

Returns:

  • (Boolean)

    true if the this job has been deleted from the database



149
150
151
# File 'lib/arj/persistence.rb', line 149

def destroyed?
  Arj.destroyed?(self)
end

#exists?Boolean

Returns true if this has a corresponding record in the database.

Returns:

  • (Boolean)

    true if this has a corresponding record in the database



116
117
118
# File 'lib/arj/persistence.rb', line 116

def exists?
  Arj.exists?(self)
end

#reloadActiveJob::Base

Reloads the attributes of this job from the database.

Returns:

  • (ActiveJob::Base)

    the specified job, updated



123
124
125
# File 'lib/arj/persistence.rb', line 123

def reload
  Arj.reload(self)
end

#save!Boolean

Saves the database record associated with this job. Raises if the record is invalid.

Returns:

  • (Boolean)

    true



130
131
132
# File 'lib/arj/persistence.rb', line 130

def save!
  Arj.save!(self)
end

#update!(attributes) ⇒ Boolean

Updates the database record associated with this job. Raises if the record is invalid.

Returns:

  • (Boolean)

    true



137
138
139
# File 'lib/arj/persistence.rb', line 137

def update!(attributes)
  Arj.update!(self, attributes)
end