Class: DataMigration::Task

Inherits:
ActiveRecord::Base
  • Object
show all
Defined in:
lib/data_migration/task.rb

Constant Summary collapse

STATUS_OPTIONS =
{
  started: "started",
  performing: "performing",
  paused: "paused",
  completed: "completed"
}

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.job_classObject



54
55
56
# File 'lib/data_migration/task.rb', line 54

def self.job_class
  DataMigration.config.job_class
end

.listObject



84
85
86
# File 'lib/data_migration/task.rb', line 84

def self.list
  Dir[DataMigration.config.data_migrations_path_glob].map { |f| File.basename(f, ".*") }
end

.perform_later(name, **kwargs) ⇒ Object



67
68
69
# File 'lib/data_migration/task.rb', line 67

def self.perform_later(name, **kwargs)
  create!(name: name).perform_later(**kwargs)
end

.perform_now(name, **kwargs) ⇒ Object



58
59
60
# File 'lib/data_migration/task.rb', line 58

def self.perform_now(name, **kwargs)
  create!(name: name).perform_now(**kwargs)
end

.prepare(name, pause_minutes: nil, jobs_limit: nil) ⇒ Object



76
77
78
# File 'lib/data_migration/task.rb', line 76

def self.prepare(name, pause_minutes: nil, jobs_limit: nil)
  create!(name: name, pause_minutes: pause_minutes, jobs_limit: jobs_limit)
end

.root_pathObject



80
81
82
# File 'lib/data_migration/task.rb', line 80

def self.root_path
  DataMigration.config.data_migrations_full_path
end

Instance Method Details

#file_exists?Boolean

Returns:

  • (Boolean)


92
93
94
# File 'lib/data_migration/task.rb', line 92

def file_exists?
  self.class.list.include?(name)
end

#file_pathObject



88
89
90
# File 'lib/data_migration/task.rb', line 88

def file_path
  "#{self.class.root_path}/#{name}.rb"
end

#job_check_in!(job_id, job_args: [], job_kwargs: {}) ⇒ Object



100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/data_migration/task.rb', line 100

def job_check_in!(job_id, job_args: [], job_kwargs: {})
  self.current_jobs ||= {}

  raise DataMigration::JobConflictError, "#{user_title} already has job ##{job_id}" if current_jobs.key?(job_id)
  raise DataMigration::JobConcurrencyLimitError, "#{user_title} reached limit of #{jobs_limit} jobs" if jobs_limit.present? && current_jobs.size >= jobs_limit

  self.current_jobs[job_id] = {
    ts: Time.current,
    args: job_args,
    kwargs: job_kwargs
  }
  save!
end

#job_check_out!(job_id) ⇒ Object



114
115
116
117
# File 'lib/data_migration/task.rb', line 114

def job_check_out!(job_id)
  self.current_jobs.delete(job_id)
  save!
end

#not_started?Boolean

Returns:

  • (Boolean)


96
97
98
# File 'lib/data_migration/task.rb', line 96

def not_started?
  status.nil? && started_at.nil?
end

#perform_later(**perform_args) ⇒ Object



71
72
73
74
# File 'lib/data_migration/task.rb', line 71

def perform_later(**perform_args)
  update!(kwargs: perform_args)
  self.class.job_class.perform_later(id, **perform_args)
end

#perform_now(**perform_args) ⇒ Object



62
63
64
65
# File 'lib/data_migration/task.rb', line 62

def perform_now(**perform_args)
  update!(kwargs: perform_args)
  self.class.job_class.perform_now(id, **perform_args)
end

#requires_pause?Boolean

Returns:

  • (Boolean)


123
124
125
# File 'lib/data_migration/task.rb', line 123

def requires_pause?
  pause_minutes.positive? && !paused?
end

#user_titleObject



119
120
121
# File 'lib/data_migration/task.rb', line 119

def user_title
  "Data migration ##{id} #{name}"
end