Class: Patriot::Worker::MultiNodeWorker
- Includes:
- MonitorMixin
- Defined in:
- lib/patriot/worker/multi_node_worker.rb
Overview
a worker implementation which can host multiple node on one process
Constant Summary collapse
- ANY =
type of node handle any jobs without label or same label
'any'
- OWN =
handle only jobs with same label
'own'
- SUPPORTED_TYPES =
supporte node types
[ANY,OWN]
- ANY_EXCLUDE_TYPE_OWN =
type of job executed by any node
0
- ONLY_SPECIFIED_NODE =
executed by only same labeled node
1
- UNEXPECTED =
unknown
2
Constants included from Util::Config
Util::Config::ADMIN_USER_KEY, Util::Config::DEFAULT_CONFIG, Util::Config::DEFAULT_PLUGIN_DIR, Util::Config::INFO_SERVER_PORT_KEY, Util::Config::PASSWORD_KEY, Util::Config::PLUGIN_DIR_KEY, Util::Config::PLUGIN_INIT_SCRIPT, Util::Config::PLUGIN_KEY, Util::Config::PLUGIN_LIB_DIR, Util::Config::USERNAME_KEY, Util::Config::WORKER_HOST_KEY, Util::Config::WORKER_USER_KEY
Instance Attribute Summary
Attributes inherited from Base
#config, #cycle, #host, #job_store, #started_at, #status
Instance Method Summary collapse
Methods inherited from Base
#execute_job, #get_pid, #initialize, #request_shutdown, #start_worker
Methods included from JobStore::Factory
Methods included from Util::Retry
Methods included from Util::Logger
Methods included from Util::Config
Constructor Details
This class inherits a constructor from Patriot::Worker::Base
Instance Method Details
#init_worker ⇒ Object
29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/patriot/worker/multi_node_worker.rb', line 29 def init_worker nodes = @config.get('nodes') raise "nodes are not configured" if nodes.nil? nodes = [nodes] unless nodes.is_a?(Array) @nodes = {} nodes.each do |n| node_config = get_node_config(@config, n) raise "node #{n} is not configured" if node_config.nil? @nodes[n] = {:queue => Queue.new }.merge(node_config) end end |
#run_worker ⇒ Object
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/patriot/worker/multi_node_worker.rb', line 51 def run_worker @threads = [] # create node threads for job execution @nodes.each do |node,conf| 1.upto(conf[:threads]) do |i| @threads << create_thread(node, i, conf[:queue]) end end # start main thread for updating queues Thread.current[:name] = 'main' while(alive?) if @status == Patriot::Worker::Status::ACTIVE begin job_tickets = @job_store.get_job_tickets(@host, @nodes.keys, {:fetch_limit => @fetch_limit}) @logger.info "get #{job_tickets.size} jobs" update_queue(job_tickets) unless job_tickets.nil? rescue => e @logger.error e end end sleep @cycle end end |
#stop_worker ⇒ Object
144 145 146 147 148 149 150 151 152 153 |
# File 'lib/patriot/worker/multi_node_worker.rb', line 144 def stop_worker @status = Patriot::Worker::Status::SHUTDOWN @logger.info "terminating worker" @nodes.each do |node,conf| conf[:queue].clear 1.upto(conf[:threads]) {|i| conf[:queue].push(:TERM) } end @threads.each{|t| t.join} @logger.info "terminated" end |