Class: RocketJob::ActiveWorker

Inherits:
Object
  • Object
show all
Defined in:
lib/rocket_job/active_worker.rb

Overview

Information about a server currently working on a job

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name, started_at, job) ⇒ ActiveWorker

Returns a new instance of ActiveWorker.



37
38
39
40
41
# File 'lib/rocket_job/active_worker.rb', line 37

def initialize(name, started_at, job)
  @name       = name
  @started_at = started_at
  @job        = job
end

Instance Attribute Details

#jobObject

Returns the value of attribute job.



7
8
9
# File 'lib/rocket_job/active_worker.rb', line 7

def job
  @job
end

#nameObject

Returns the value of attribute name.



8
9
10
# File 'lib/rocket_job/active_worker.rb', line 8

def name
  @name
end

#started_atObject

When this server started working on this job / slice



5
6
7
# File 'lib/rocket_job/active_worker.rb', line 5

def started_at
  @started_at
end

Class Method Details

.all(server_name = nil) ⇒ Object

Returns [Hash<String:ActiveWorker>] hash of all servers sorted by name and what they are currently working on. Returns {} if no servers are currently busy doing any work

Parameters

server_name: [String]
  Only jobs running on the specified server


17
18
19
20
21
22
23
24
25
26
# File 'lib/rocket_job/active_worker.rb', line 17

def self.all(server_name = nil)
  servers = []
  # Need paused, failed or aborted since servers may still be working on active slices
  query = RocketJob::Job.where(:state.in => %i[running paused failed aborted])
  query = query.where(worker_name: /\A#{server_name}/) if server_name
  query.each do |job|
    servers += job.rocket_job_active_workers
  end
  servers
end

.requeue_zombiesObject

Requeues all jobs for which the workers have disappeared



29
30
31
32
33
34
35
# File 'lib/rocket_job/active_worker.rb', line 29

def self.requeue_zombies
  all.each do |active_worker|
    next if !active_worker.zombie? || !active_worker.job.may_requeue?(active_worker.server_name)

    active_worker.job.requeue!(active_worker.server_name)
  end
end

Instance Method Details

#durationObject

Duration in human readable form



44
45
46
# File 'lib/rocket_job/active_worker.rb', line 44

def duration
  RocketJob.seconds_as_duration(duration_s)
end

#duration_sObject

Number of seconds this server has been working on this job / slice



49
50
51
# File 'lib/rocket_job/active_worker.rb', line 49

def duration_s
  Time.now - (started_at || Time.now)
end

#serverObject



62
63
64
# File 'lib/rocket_job/active_worker.rb', line 62

def server
  @server ||= RocketJob::Server.where(name: server_name).first
end

#server_nameObject

Returns [String] the name of the server running this worker



54
55
56
57
58
59
60
# File 'lib/rocket_job/active_worker.rb', line 54

def server_name
  if (match = name.to_s.match(/(.*:.*):.*/))
    match[1]
  else
    name
  end
end

#zombie?Boolean

The server on which this worker was running is no longer running

Returns:

  • (Boolean)


67
68
69
# File 'lib/rocket_job/active_worker.rb', line 67

def zombie?
  server.nil?
end