Class: DRbQS::Node
- Inherits:
-
Object
- Object
- DRbQS::Node
- Defined in:
- lib/drbqs/node/node.rb,
lib/drbqs/node/state.rb,
lib/drbqs/node/connection.rb,
lib/drbqs/node/task_client.rb
Direct Known Subclasses
Defined Under Namespace
Classes: Connection, State, TaskClient
Constant Summary collapse
- PRIORITY_RESPOND =
10
- PRIORITY_CALCULATE =
0
- OUTPUT_NOT_SEND_RESULT =
'not_send_result'
- DEFAULT_LOG_FILE =
'drbqs_client.log'
- INTERVAL_TIME_DEFAULT =
0.1
- WAIT_NEW_TASK_TIME =
1
- SAME_HOST_GROUP =
:local
- MAX_WORKER_WAIT_TIME =
3
- WORKER_WAIT_INTERVAL =
0.1
Instance Method Summary collapse
- #calculate(opts = {}) ⇒ Object
-
#connect ⇒ Object
Connect to the server and finish initialization of the node.
-
#initialize(access_uri, opts = {}) ⇒ Node
constructor
A new instance of Node.
- #server_on_same_host? ⇒ Boolean
- #set_signal_trap ⇒ Object
Constructor Details
#initialize(access_uri, opts = {}) ⇒ Node
Returns a new instance of Node.
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/drbqs/node/node.rb', line 26 def initialize(access_uri, opts = {}) @access_uri = access_uri @logger = DRbQS::Misc.create_logger(opts[:log_file] || DEFAULT_LOG_FILE, opts[:log_level]) @connection = nil @task_client = nil @worker_number = opts[:process] || 1 @state = DRbQS::Node::State.new(:wait, @worker_number, :max_loadavg => opts[:max_loadavg], :sleep_time => opts[:sleep_time]) @group = opts[:group] || [] @signal_to_server_queue = Queue.new @config = DRbQS::Config.new @special_task_number = 0 @worker = DRbQS::Worker::ProcessSet.new(DRbQS::Worker::ForkedProcess) @worker.on_result do |proc_key, res| task_id, h = res queue_result(task_id, h) end @worker.on_error do |proc_key, res| @signal_to_server_queue.push([:node_error, res]) end end |
Instance Method Details
#calculate(opts = {}) ⇒ Object
268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 |
# File 'lib/drbqs/node/node.rb', line 268 def calculate(opts = {}) set_signal_trap begin server_has_no_task = nil loop do send_result_to_server unless process_signal_for_server break end if @state.change_to_sleep_for_busy_system @logger.info("Sleep because system is busy.") end if server_has_no_task && (t = server_has_no_task + WAIT_NEW_TASK_TIME - Time.now) > 0 sleep(t) server_has_no_task = nil end if get_new_task send_task_to_worker elsif @state.ready_to_exit_after_task? && @task_client.result_empty? execute_finalization break elsif @state.request? server_has_no_task = Time.now end unless respond_worker_signal wait_interval_of_connection end end rescue => err send_error(err, "Node error occurs.") @worker.kill_all_processes end wait_process_finish clear_node_files end |
#connect ⇒ Object
Connect to the server and finish initialization of the node.
87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 |
# File 'lib/drbqs/node/node.rb', line 87 def connect obj = DRbObject.new_with_uri(@access_uri) @server_key = obj[:key] @connection = DRbQS::Node::Connection.new(obj[:message], @logger) set_node_group_for_task @task_client = DRbQS::Node::TaskClient.new(@connection.node_number, obj[:queue], obj[:result], @group, @logger) DRbQS::Transfer::Client.set(obj[:transfer].get_client(server_on_same_host?)) if obj[:transfer] @state.each_worker_id do |wid| @worker.create_process(wid) end if ary_initialization = @connection.get_initialization send_special_task_ary_to_all_workers(ary_initialization) end @config.list.node.save(Process.pid, node_data) end |
#server_on_same_host? ⇒ Boolean
112 113 114 115 |
# File 'lib/drbqs/node/node.rb', line 112 def server_on_same_host? @server_on_same_host || (@server_on_same_host = @config.list.server.server_of_key_exist?(@access_uri, @server_key)) end |
#set_signal_trap ⇒ Object
235 236 237 238 239 |
# File 'lib/drbqs/node/node.rb', line 235 def set_signal_trap Signal.trap(:TERM) do @signal_to_server_queue.push([:signal_kill]) end end |