Class: ActiveJobK8s::Scheduler
- Inherits:
-
Object
- Object
- ActiveJobK8s::Scheduler
- Defined in:
- lib/active_job_k8s/scheduler.rb
Instance Attribute Summary collapse
- #default_manifest ⇒ Hash readonly
- #kubeclient_context ⇒ Kubeclient::Config::Context readonly
Class Method Summary collapse
Instance Method Summary collapse
- #create_job(job, scheduled_at: nil) ⇒ Object
-
#initialize(**opts) ⇒ Scheduler
constructor
A new instance of Scheduler.
-
#un_suspend_jobs ⇒ Object
Un-suspend jobs if the scheduled at is outdated.
Constructor Details
#initialize(**opts) ⇒ Scheduler
Returns a new instance of Scheduler.
15 16 17 18 19 20 21 |
# File 'lib/active_job_k8s/scheduler.rb', line 15 def initialize(**opts) raise "No KubeClientContext given" if opts[:kubeclient_context].nil? # or to use a specific context, by name: @kubeclient_context = opts[:kubeclient_context] @default_manifest = opts[:default_manifest] || {} end |
Instance Attribute Details
#default_manifest ⇒ Hash (readonly)
12 13 14 |
# File 'lib/active_job_k8s/scheduler.rb', line 12 def default_manifest @default_manifest end |
#kubeclient_context ⇒ Kubeclient::Config::Context (readonly)
9 10 11 |
# File 'lib/active_job_k8s/scheduler.rb', line 9 def kubeclient_context @kubeclient_context end |
Class Method Details
.execute_job ⇒ Object
60 61 62 |
# File 'lib/active_job_k8s/scheduler.rb', line 60 def self.execute_job ActiveJob::Base.execute(JSON.parse(ENV['SERIALIZED_JOB'])) end |
Instance Method Details
#create_job(job, scheduled_at: nil) ⇒ Object
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/active_job_k8s/scheduler.rb', line 23 def create_job(job, scheduled_at: nil) serialized_job = JSON.dump(job.serialize) manifest = (job.respond_to?(:manifest) and job.manifest.is_a?(Hash) and !job.manifest.empty?) ? job.manifest : default_manifest kube_job = Kubeclient::Resource.new(manifest) kube_job..name = "#{kube_job..name}-#{job.job_id}" kube_job..annotations ||= {} kube_job..annotations.job_id = job.job_id kube_job..annotations.queue_name = job.queue_name kube_job..namespace = kube_job..namespace || kubeclient_context.namespace kube_job.spec.template.spec.containers.map do |container| container.env ||= [] container.env.push({ 'name' => 'SERIALIZED_JOB', 'value' => serialized_job }) if container.command.blank? container.command = ["rails"] container.args = ["active_job_k8s:run_job"] end end kube_job.spec.ttlSecondsAfterFinished = 300 #number of seconds the job will be erased kube_job..labels ||= {} if scheduled_at kube_job.spec.suspend = true kube_job..annotations.scheduled_at = scheduled_at.to_s kube_job..labels.activeJobK8s = "scheduled" # job to be execute when time comes else kube_job..labels.activeJobK8s = "delayed" # job to be execute when possible end client.create_job(kube_job) end |
#un_suspend_jobs ⇒ Object
Un-suspend jobs if the scheduled at is outdated
66 67 68 69 70 71 72 73 |
# File 'lib/active_job_k8s/scheduler.rb', line 66 def un_suspend_jobs suspended_jobs.each do |sj| scheduled_at = Time.at(sj..annotations.scheduled_at.to_f) if Time.now > scheduled_at and sj.spec.suspend client.patch_job(sj..name, { spec: { suspend: false } }, sj..namespace).inspect end end end |