Class: Patriot::Worker::Base Abstract
- Inherits:
-
Object
- Object
- Patriot::Worker::Base
- Includes:
- JobStore::Factory, Util::Logger, Util::Retry
- Defined in:
- lib/patriot/worker/base.rb
Overview
base class for worker implementations
Direct Known Subclasses
Constant Summary
Constants included from Util::Config
Util::Config::ADMIN_USER_KEY, Util::Config::DEFAULT_CONFIG, Util::Config::DEFAULT_PLUGIN_DIR, Util::Config::INFO_SERVER_PORT_KEY, Util::Config::PASSWORD_KEY, Util::Config::PLUGIN_DIR_KEY, Util::Config::PLUGIN_INIT_SCRIPT, Util::Config::PLUGIN_KEY, Util::Config::PLUGIN_LIB_DIR, Util::Config::USERNAME_KEY, Util::Config::WORKER_HOST_KEY, Util::Config::WORKER_USER_KEY
Instance Attribute Summary collapse
-
#config ⇒ Object
Returns the value of attribute config.
-
#cycle ⇒ Object
Returns the value of attribute cycle.
-
#host ⇒ Object
Returns the value of attribute host.
-
#job_store ⇒ Object
Returns the value of attribute job_store.
-
#started_at ⇒ Object
readonly
Returns the value of attribute started_at.
-
#status ⇒ Object
Returns the value of attribute status.
Instance Method Summary collapse
-
#execute_job(job_ticket) ⇒ Patriot::Command::ExitCode
execute a job.
-
#get_pid ⇒ Integer
Pid if the worker is running, otherwise nil.
-
#init_worker ⇒ Object
should be overrided in sub class This method is for implementation-specific configuration.
-
#initialize(config) ⇒ Base
constructor
A new instance of Base.
-
#request_shutdown ⇒ Boolean
send a request graceful shutdown to a running worker.
-
#run_worker ⇒ Object
should be overrided in sub class Main loop in which the worker fetches and executes jobs should be implemented here.
-
#start_worker ⇒ Object
main entry point of worker processing.
-
#stop_worker ⇒ Object
should be overrided in sub class Tasks for tearing down the worker should be implemented here.
Methods included from JobStore::Factory
Methods included from Util::Retry
Methods included from Util::Logger
Methods included from Util::Config
Constructor Details
#initialize(config) ⇒ Base
Returns a new instance of Base.
40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/patriot/worker/base.rb', line 40 def initialize(config) raise "configuration is nil" if config.nil? @logger = create_logger(config) @config = config @job_store = create_jobstore(Patriot::JobStore::ROOT_STORE_ID, @config) @host = `hostname`.chomp @cycle = config.get('fetch_cycle', Patriot::Worker::DEFAULT_FETCH_CYCLE).to_i @fetch_limit = config.get('fetch_limit', Patriot::Worker::DEFAULT_FETCH_LIMIT).to_i @worker_name = config.get('worker_name', Patriot::Worker::DEFAULT_WORKER_NAME) @info_server = Patriot::Worker::InfoServer.new(self,@config) end |
Instance Attribute Details
#config ⇒ Object
Returns the value of attribute config.
36 37 38 |
# File 'lib/patriot/worker/base.rb', line 36 def config @config end |
#cycle ⇒ Object
Returns the value of attribute cycle.
36 37 38 |
# File 'lib/patriot/worker/base.rb', line 36 def cycle @cycle end |
#host ⇒ Object
Returns the value of attribute host.
36 37 38 |
# File 'lib/patriot/worker/base.rb', line 36 def host @host end |
#job_store ⇒ Object
Returns the value of attribute job_store.
36 37 38 |
# File 'lib/patriot/worker/base.rb', line 36 def job_store @job_store end |
#started_at ⇒ Object (readonly)
Returns the value of attribute started_at.
37 38 39 |
# File 'lib/patriot/worker/base.rb', line 37 def started_at @started_at end |
#status ⇒ Object
Returns the value of attribute status.
36 37 38 |
# File 'lib/patriot/worker/base.rb', line 36 def status @status end |
Instance Method Details
#execute_job(job_ticket) ⇒ Patriot::Command::ExitCode
execute a job
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
# File 'lib/patriot/worker/base.rb', line 55 def execute_job(job_ticket) job_ticket.exec_host = @host job_ticket.exec_node = Thread.current[:name] begin response = @job_store.offer_to_execute(job_ticket) rescue Exception => e @logger.error e return Patriot::Command::ExitCode::FAILED end # already executed by other node return Patriot::Command::ExitCode::SKIPPED if response.nil? @logger.info " executing job: #{job_ticket.job_id}" command = response[:command] job_ticket.execution_id = response[:execution_id] job_ticket.exit_code = Patriot::Command::ExitCode::FAILED begin command.execute job_ticket.exit_code = Patriot::Command::ExitCode::SUCCEEDED rescue Exception => e @logger.warn " job : #{job_ticket.job_id} failed" @logger.warn e job_ticket.description = e.to_s else job_ticket.description = command.description ensure begin execute_with_retry{ @job_store.report_completion_status(job_ticket) } rescue Exception => job_store_error @logger.error job_store_error end unless command.post_processors.nil? continue_post_processing = true command.post_processors.each do |pp| begin if continue_post_processing @logger.info "executing post process by #{pp}" continue_post_processing = continue_post_processing && pp.process(command, self, job_ticket) else @logger.info "skipping post process by #{pp}" end rescue Exception => post_process_error @logger.error "post process by #{pp} failed" @logger.error post_process_error end end end end return job_ticket.exit_code end |
#get_pid ⇒ Integer
Returns pid if the worker is running, otherwise nil.
108 109 110 |
# File 'lib/patriot/worker/base.rb', line 108 def get_pid return Patriot::Worker.get_pid(@config) end |
#init_worker ⇒ Object
should be overrided in sub class This method is for implementation-specific configuration
147 148 149 |
# File 'lib/patriot/worker/base.rb', line 147 def init_worker raise NotImplementedError end |
#request_shutdown ⇒ Boolean
send a request graceful shutdown to a running worker
114 115 116 117 118 119 120 121 122 |
# File 'lib/patriot/worker/base.rb', line 114 def request_shutdown pid = get_pid if pid.nil? @logger.info("worker #{@worker_name} does not exist") return false end Process.kill(SIGNAL_FOR_GRACEFUL_SHUTDOWN[0], pid.to_i) return true end |
#run_worker ⇒ Object
should be overrided in sub class Main loop in which the worker fetches and executes jobs should be implemented here
153 154 155 |
# File 'lib/patriot/worker/base.rb', line 153 def run_worker raise NotImplementedError end |
#start_worker ⇒ Object
main entry point of worker processing
125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 |
# File 'lib/patriot/worker/base.rb', line 125 def start_worker return unless get_pid.nil? @logger.info "starting worker #{@node}@#{@host}" pid_file = Patriot::Worker.get_pid_file(@config) File.open(pid_file, 'w') {|f| f.write($$)} # save pid for shutdown set_traps @info_server.start_server @started_at = Time.now @logger.info "initiating worker #{@node}@#{@host}" init_worker @status = Patriot::Worker::Status::ACTIVE @logger.info "start worker #{@node}@#{@host}" run_worker @logger.info "shutting down worker #{@node}@#{@host}" stop_worker # should be last since worker_admin judge availability from the info_server @info_server.shutdown_server end |
#stop_worker ⇒ Object
should be overrided in sub class Tasks for tearing down the worker should be implemented here
159 160 161 |
# File 'lib/patriot/worker/base.rb', line 159 def stop_worker raise NotImplementedError end |