Class: DRbQS::Node

Inherits:
Object
  • Object
show all
Defined in:
lib/drbqs/node/node.rb,
lib/drbqs/node/state.rb,
lib/drbqs/node/connection.rb,
lib/drbqs/node/task_client.rb

Direct Known Subclasses

Test::Node

Defined Under Namespace

Classes: Connection, State, TaskClient

Constant Summary collapse

PRIORITY_RESPOND =
10
PRIORITY_CALCULATE =
0
OUTPUT_NOT_SEND_RESULT =
'not_send_result'
DEFAULT_LOG_FILE =
'drbqs_client.log'
INTERVAL_TIME_DEFAULT =
0.1
WAIT_NEW_TASK_TIME =
1
SAME_HOST_GROUP =
:local
MAX_WORKER_WAIT_TIME =
3
WORKER_WAIT_INTERVAL =
0.1

Instance Method Summary collapse

Constructor Details

#initialize(access_uri, opts = {}) ⇒ Node

Returns a new instance of Node.

Parameters:

  • access_uri (String)

    Set the uri of server

  • opts (Hash) (defaults to: {})

    Options of a node

Options Hash (opts):

  • :process (Fixnum)

    Number of worker processes

  • :group (Array)

    An array of group symbols

  • :sleep_time (Fixnum)

    Time interval during sleep of the node

  • :max_loadavg (String)

    Note that this optiono is experimental



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/drbqs/node/node.rb', line 26

def initialize(access_uri, opts = {})
  @access_uri = access_uri
  @logger = DRbQS::Misc.create_logger(opts[:log_file] || DEFAULT_LOG_FILE, opts[:log_level])
  @connection = nil
  @task_client = nil
  @worker_number = opts[:process] || 1
  @state = DRbQS::Node::State.new(:wait, @worker_number, :max_loadavg => opts[:max_loadavg], :sleep_time => opts[:sleep_time])
  @group = opts[:group] || []
  @signal_to_server_queue = Queue.new
  @config = DRbQS::Config.new
  @special_task_number = 0
  @worker = DRbQS::Worker::ProcessSet.new(DRbQS::Worker::ForkedProcess)
  @worker.on_result do |proc_key, res|
    task_id, h = res
    queue_result(task_id, h)
  end
  @worker.on_error do |proc_key, res|
    @signal_to_server_queue.push([:node_error, res])
  end
end

Instance Method Details

#calculate(opts = {}) ⇒ Object



268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
# File 'lib/drbqs/node/node.rb', line 268

def calculate(opts = {})
  set_signal_trap
  begin
    server_has_no_task = nil
    loop do
      send_result_to_server
      unless process_signal_for_server
        break
      end
      if @state.change_to_sleep_for_busy_system
        @logger.info("Sleep because system is busy.")
      end
      if server_has_no_task && (t = server_has_no_task + WAIT_NEW_TASK_TIME - Time.now) > 0
        sleep(t)
        server_has_no_task = nil
      end
      if get_new_task
        send_task_to_worker
      elsif @state.ready_to_exit_after_task? && @task_client.result_empty?
        execute_finalization
        break
      elsif @state.request?
        server_has_no_task = Time.now
      end
      unless respond_worker_signal
        wait_interval_of_connection
      end
    end
  rescue => err
    send_error(err, "Node error occurs.")
    @worker.kill_all_processes
  end
  wait_process_finish
  clear_node_files
end

#connectObject

Connect to the server and finish initialization of the node.



87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/drbqs/node/node.rb', line 87

def connect
  obj = DRbObject.new_with_uri(@access_uri)
  @server_key = obj[:key]
  @connection = DRbQS::Node::Connection.new(obj[:message], @logger)
  set_node_group_for_task
  @task_client = DRbQS::Node::TaskClient.new(@connection.node_number, obj[:queue], obj[:result],
                                             @group, @logger)
  DRbQS::Transfer::Client.set(obj[:transfer].get_client(server_on_same_host?)) if obj[:transfer]
  @state.each_worker_id do |wid|
    @worker.create_process(wid)
  end
  if ary_initialization = @connection.get_initialization
    send_special_task_ary_to_all_workers(ary_initialization)
  end
  @config.list.node.save(Process.pid, node_data)
end

#server_on_same_host?Boolean

Returns:

  • (Boolean)


112
113
114
115
# File 'lib/drbqs/node/node.rb', line 112

def server_on_same_host?
  @server_on_same_host ||
    (@server_on_same_host = @config.list.server.server_of_key_exist?(@access_uri, @server_key))
end

#set_signal_trapObject



235
236
237
238
239
# File 'lib/drbqs/node/node.rb', line 235

def set_signal_trap
  Signal.trap(:TERM) do
    @signal_to_server_queue.push([:signal_kill])
  end
end