Class: Patriot::Worker::MultiNodeWorker

Inherits:
Base
  • Object
show all
Includes:
MonitorMixin
Defined in:
lib/patriot/worker/multi_node_worker.rb

Overview

a worker implementation which can host multiple node on one process

Constant Summary collapse

ANY =

type of node handle any jobs without label or same label

'any'
OWN =

handle only jobs with same label

'own'
SUPPORTED_TYPES =

supporte node types

[ANY,OWN]
ANY_EXCLUDE_TYPE_OWN =

type of job executed by any node

0
ONLY_SPECIFIED_NODE =

executed by only same labeled node

1
UNEXPECTED =

unknown

2

Constants included from Util::Config

Util::Config::ADMIN_USER_KEY, Util::Config::DEFAULT_CONFIG, Util::Config::DEFAULT_PLUGIN_DIR, Util::Config::INFO_SERVER_PORT_KEY, Util::Config::PASSWORD_KEY, Util::Config::PLUGIN_DIR_KEY, Util::Config::PLUGIN_INIT_SCRIPT, Util::Config::PLUGIN_KEY, Util::Config::PLUGIN_LIB_DIR, Util::Config::USERNAME_KEY, Util::Config::WORKER_HOST_KEY, Util::Config::WORKER_USER_KEY

Instance Attribute Summary

Attributes inherited from Base

#config, #cycle, #host, #job_store, #started_at, #status

Instance Method Summary collapse

Methods inherited from Base

#execute_job, #get_pid, #initialize, #request_shutdown, #start_worker

Methods included from JobStore::Factory

create_jobstore

Methods included from Util::Retry

execute_with_retry

Methods included from Util::Logger

#create_logger

Methods included from Util::Config

#load_config, #load_plugins

Constructor Details

This class inherits a constructor from Patriot::Worker::Base

Instance Method Details

#init_workerObject

See Also:



29
30
31
32
33
34
35
36
37
38
39
# File 'lib/patriot/worker/multi_node_worker.rb', line 29

def init_worker 
  nodes = @config.get('nodes')
  raise "nodes are not configured" if nodes.nil?
  nodes = [nodes] unless nodes.is_a?(Array)
  @nodes = {}
  nodes.each do |n|
    node_config = get_node_config(@config, n)
    raise "node #{n} is not configured" if node_config.nil?
    @nodes[n] = {:queue => Queue.new }.merge(node_config)
  end
end

#run_workerObject

See Also:



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/patriot/worker/multi_node_worker.rb', line 51

def run_worker
  @threads = []
  # create node threads for job execution
  @nodes.each do |node,conf|
    1.upto(conf[:threads]) do |i| 
      @threads << create_thread(node, i, conf[:queue])
    end
  end
  # start main thread for updating queues
  Thread.current[:name] = 'main'
  while(alive?)
    if @status == Patriot::Worker::Status::ACTIVE
      begin
        job_tickets = @job_store.get_job_tickets(@host, @nodes.keys, {:fetch_limit => @fetch_limit})
        @logger.info "get #{job_tickets.size} jobs"
        update_queue(job_tickets) unless job_tickets.nil?
      rescue => e
        @logger.error e
      end
    end
    sleep @cycle
  end
end

#stop_workerObject

See Also:



144
145
146
147
148
149
150
151
152
153
# File 'lib/patriot/worker/multi_node_worker.rb', line 144

def stop_worker
  @status = Patriot::Worker::Status::SHUTDOWN
  @logger.info "terminating worker"
  @nodes.each do |node,conf|
    conf[:queue].clear
    1.upto(conf[:threads]) {|i| conf[:queue].push(:TERM) }
  end
  @threads.each{|t| t.join}
  @logger.info "terminated"
end