Class: Patriot::Worker::Base Abstract

Inherits:
Object
  • Object
show all
Includes:
JobStore::Factory, Util::Logger, Util::Retry
Defined in:
lib/patriot/worker/base.rb

Overview

This class is abstract.

base class for worker implementations

Direct Known Subclasses

JobStoreServer, MultiNodeWorker

Constant Summary

Constants included from Util::Config

Util::Config::ADMIN_USER_KEY, Util::Config::DEFAULT_CONFIG, Util::Config::DEFAULT_PLUGIN_DIR, Util::Config::INFO_SERVER_PORT_KEY, Util::Config::PASSWORD_KEY, Util::Config::PLUGIN_DIR_KEY, Util::Config::PLUGIN_INIT_SCRIPT, Util::Config::PLUGIN_KEY, Util::Config::PLUGIN_LIB_DIR, Util::Config::USERNAME_KEY, Util::Config::WORKER_HOST_KEY, Util::Config::WORKER_USER_KEY

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from JobStore::Factory

create_jobstore

Methods included from Util::Retry

execute_with_retry

Methods included from Util::Logger

#create_logger

Methods included from Util::Config

#load_config, #load_plugins

Constructor Details

#initialize(config) ⇒ Base

Returns a new instance of Base.

Parameters:



40
41
42
43
44
45
46
47
48
49
50
# File 'lib/patriot/worker/base.rb', line 40

def initialize(config)
  raise "configuration is nil" if config.nil?
  @logger      = create_logger(config)
  @config      = config
  @job_store   = create_jobstore(Patriot::JobStore::ROOT_STORE_ID, @config)
  @host        = `hostname`.chomp
  @cycle       = config.get('fetch_cycle', Patriot::Worker::DEFAULT_FETCH_CYCLE).to_i
  @fetch_limit = config.get('fetch_limit', Patriot::Worker::DEFAULT_FETCH_LIMIT).to_i
  @worker_name = config.get('worker_name', Patriot::Worker::DEFAULT_WORKER_NAME)
  @info_server = Patriot::Worker::InfoServer.new(self,@config)
end

Instance Attribute Details

#configObject

Returns the value of attribute config.



36
37
38
# File 'lib/patriot/worker/base.rb', line 36

def config
  @config
end

#cycleObject

Returns the value of attribute cycle.



36
37
38
# File 'lib/patriot/worker/base.rb', line 36

def cycle
  @cycle
end

#hostObject

Returns the value of attribute host.



36
37
38
# File 'lib/patriot/worker/base.rb', line 36

def host
  @host
end

#job_storeObject

Returns the value of attribute job_store.



36
37
38
# File 'lib/patriot/worker/base.rb', line 36

def job_store
  @job_store
end

#started_atObject (readonly)

Returns the value of attribute started_at.



37
38
39
# File 'lib/patriot/worker/base.rb', line 37

def started_at
  @started_at
end

#statusObject

Returns the value of attribute status.



36
37
38
# File 'lib/patriot/worker/base.rb', line 36

def status
  @status
end

Instance Method Details

#execute_job(job_ticket) ⇒ Patriot::Command::ExitCode

execute a job

Parameters:

Returns:



55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/patriot/worker/base.rb', line 55

def execute_job(job_ticket)
  job_ticket.exec_host   = @host
  job_ticket.exec_node   = Thread.current[:name]
  begin
    response = @job_store.offer_to_execute(job_ticket)
  rescue Exception => e
    @logger.error e
    return Patriot::Command::ExitCode::FAILED
  end

  # already executed by other node
  return Patriot::Command::ExitCode::SKIPPED if response.nil?

  @logger.info " executing job: #{job_ticket.job_id}"
  command                 = response[:command]
  job_ticket.execution_id = response[:execution_id]
  job_ticket.exit_code    = Patriot::Command::ExitCode::FAILED
  begin
    command.execute
    job_ticket.exit_code  = Patriot::Command::ExitCode::SUCCEEDED
  rescue Exception => e
    @logger.warn " job : #{job_ticket.job_id} failed"
    @logger.warn e
    job_ticket.description = e.to_s
  else
    job_ticket.description = command.description
  ensure
    begin
      execute_with_retry{ @job_store.report_completion_status(job_ticket) }
    rescue Exception => job_store_error
      @logger.error job_store_error
    end
    unless command.post_processors.nil?
      continue_post_processing = true
      command.post_processors.each do |pp|
        begin
          if continue_post_processing
            @logger.info "executing post process by #{pp}"
            continue_post_processing = continue_post_processing && pp.process(command, self, job_ticket)
          else
            @logger.info "skipping post process by #{pp}"
          end
        rescue Exception => post_process_error
          @logger.error "post process by #{pp} failed"
          @logger.error post_process_error
        end
      end
    end
  end
  return job_ticket.exit_code
end

#get_pidInteger

Returns pid if the worker is running, otherwise nil.

Returns:

  • (Integer)

    pid if the worker is running, otherwise nil



108
109
110
# File 'lib/patriot/worker/base.rb', line 108

def get_pid
  return Patriot::Worker.get_pid(@config)
end

#init_workerObject

should be overrided in sub class This method is for implementation-specific configuration

Raises:

  • (NotImplementedError)


147
148
149
# File 'lib/patriot/worker/base.rb', line 147

def init_worker
  raise NotImplementedError
end

#request_shutdownBoolean

send a request graceful shutdown to a running worker

Returns:

  • (Boolean)

    true worker is running and request is sent, otherwise false



114
115
116
117
118
119
120
121
122
# File 'lib/patriot/worker/base.rb', line 114

def request_shutdown
  pid = get_pid
  if pid.nil?
    @logger.info("worker #{@worker_name} does not exist")
    return false
  end
  Process.kill(SIGNAL_FOR_GRACEFUL_SHUTDOWN[0], pid.to_i)
  return true
end

#run_workerObject

should be overrided in sub class Main loop in which the worker fetches and executes jobs should be implemented here

Raises:

  • (NotImplementedError)


153
154
155
# File 'lib/patriot/worker/base.rb', line 153

def run_worker
  raise NotImplementedError
end

#start_workerObject

main entry point of worker processing



125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# File 'lib/patriot/worker/base.rb', line 125

def start_worker
  return unless get_pid.nil?
  @logger.info "starting worker #{@node}@#{@host}"
  pid_file = Patriot::Worker.get_pid_file(@config)
  File.open(pid_file, 'w') {|f| f.write($$)} # save pid for shutdown
  set_traps
  @info_server.start_server
  @started_at = Time.now
  @logger.info "initiating worker #{@node}@#{@host}"
  init_worker
  @status = Patriot::Worker::Status::ACTIVE
  @logger.info "start worker #{@node}@#{@host}"
  run_worker
  @logger.info "shutting down worker #{@node}@#{@host}"
  stop_worker
  # should be last since worker_admin judge availability from the info_server
  @info_server.shutdown_server
end

#stop_workerObject

should be overrided in sub class Tasks for tearing down the worker should be implemented here

Raises:

  • (NotImplementedError)


159
160
161
# File 'lib/patriot/worker/base.rb', line 159

def stop_worker
  raise NotImplementedError
end