Class: DRbQS::Server

Inherits:
Object
  • Object
show all
Includes:
Misc
Defined in:
lib/drbqs/server/queue.rb,
lib/drbqs/server/server.rb,
lib/drbqs/server/history.rb,
lib/drbqs/server/message.rb,
lib/drbqs/server/acl_file.rb,
lib/drbqs/server/node_list.rb,
lib/drbqs/server/check_alive.rb,
lib/drbqs/server/server_hook.rb,
lib/drbqs/server/transfer_setting.rb

Direct Known Subclasses

Test::Server

Defined Under Namespace

Modules: ACLFile Classes: CheckAlive, History, Hook, Message, NodeList, Queue, TaskHistory, TransferSetting

Constant Summary collapse

WAIT_TIME_NODE_EXIT =
3
WAIT_TIME_NODE_FINALIZE =
10
WAIT_TIME_NEW_RESULT =
1

Constants included from Misc

Misc::STRINGS_FOR_KEY

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Misc

create_logger, create_uri, output_error, process_running_normally?, random_key, time_to_history_string, time_to_history_string2, uri_drbunix

Constructor Details

#initialize(opts = {}) ⇒ Server

:nodoc:

  • Note of tuple spaces @ts[:message]
    • used in node/connection.rb
    • some messages to both server and node
    • special tasks from server to nodes @ts[:queue]
    • used in node/task_client.rb
    • tasks from server to nodes @ts[:result]
    • used in node/task_client.rb
    • accept signal from nodes
    • results from nodes

Parameters:

  • opts (Hash) (defaults to: {})

    The options of server

Options Hash (opts):

  • :port (Fixnum)

    Set the port of server.

  • :unix (String)

    Set the path of unix domain socket. If :port is specified then :port is preceded.

  • :acl (Array)

    Set the array of ACL.

  • :acl (String)

    Set the file path of ACL.

  • :log_file (String)

    Set the path of log files.

  • :log_level (Fixnum)

    Set the level of logging.

  • :check_alive (Fixnum)

    Set the time interval of checking alive nodes.

  • :not_exit (Boolean)

    Not exit programs when all tasks are finished.

  • :shutdown_unused_nodes (Boolean)

    Shutdown unused nodes.

  • :signal_trap (Boolean)

    Set trapping signal. Default is true.

  • :sftp_user (String)

    Set user of sftp.

  • :sftp_host (String)

    Set host of sftp.

  • :file_directory (String)

    Set the directory for nodes to send files.



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/drbqs/server/server.rb', line 48

def initialize(opts = {})
  @uri = DRbQS::Misc.create_uri(opts)
  @acl = acl_init(opts[:acl])
  @key = DRbQS::Misc.random_key + sprintf("_%d", Time.now.to_i)
  @ts = {
    :message => Rinda::TupleSpace.new,
    :queue => Rinda::TupleSpace.new,
    :result => Rinda::TupleSpace.new,
    :key => @key,
    :transfer => nil
  }
  @logger = DRbQS::Misc.create_logger(opts[:log_file], opts[:log_level])
  @message = DRbQS::Server::Message.new(@ts[:message], @logger)
  @queue= DRbQS::Server::Queue.new(@ts[:queue], @ts[:result], @logger)
  @check_alive = DRbQS::Server::CheckAlive.new(opts[:check_alive])
  @task_generator = []
  hook_init(!opts[:not_exit], opts[:shutdown_unused_nodes])
  set_signal_trap if !opts.has_key?(:signal_trap) || opts[:signal_trap]
  @finalization_task = []
  @data_storage = []
  @transfer_setting = DRbQS::Server::TransferSetting.new(opts[:sftp_host], opts[:sftp_user], opts[:file_directory])
  @config = DRbQS::Config.new
end

Instance Attribute Details

#queueObject (readonly)

Returns the value of attribute queue.



18
19
20
# File 'lib/drbqs/server/server.rb', line 18

def queue
  @queue
end

#uriObject (readonly)

Returns the value of attribute uri.



18
19
20
# File 'lib/drbqs/server/server.rb', line 18

def uri
  @uri
end

Instance Method Details

#add_hook(key, opts = {}, &block) ⇒ Object

Note:

When we set both :empty_queue and task generators, hook of :empty_queue is prior to task generators.

Set a hook of server.

Parameters:

  • key (:empty_queue, :process_data, :finish)

    Set the type of hook.

  • block (Proc)

    The block is obligatory and takes server itself as an argument.

  • opts (Hash) (defaults to: {})

    a customizable set of options

Options Hash (opts):

  • :repeat (Fixnum)

    If we execute the hook specified times then the hook is deleted. If the value is nil, the hook is repeated without limit.

  • :name (String)

    Name of the hook. If the value is nil then the name is automatically created.



182
183
184
185
186
187
188
189
# File 'lib/drbqs/server/server.rb', line 182

def add_hook(key, opts = {}, &block)
  if key == :process_data
    if @hook.number_of_hook(:process_data) != 0
      raise "Hook :process_data has already set."
    end
  end
  @hook.add(key, opts, &block)
end

#add_task_generator(task_generator) ⇒ Object

Parameters:



120
121
122
# File 'lib/drbqs/server/server.rb', line 120

def add_task_generator(task_generator)
  @task_generator << task_generator
end

#delete_hook(key, name = nil) ⇒ Object

Parameters:

  • key (:empty_queue, :process_data, :finish)

    Set the type of hook.

  • name (String) (defaults to: nil)

    Name of the hook. If the value is nil then all hooks of the key is deleted.



193
194
195
# File 'lib/drbqs/server/server.rb', line 193

def delete_hook(key, name = nil)
  @hook.delete(key, name)
end

#exitObject



258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
# File 'lib/drbqs/server/server.rb', line 258

def exit
  if !@finalization_task.empty?
    @message.set_finalization_tasks(@finalization_task)
    @message.send_finalization
    wait_time = WAIT_TIME_NODE_FINALIZE
  else
    @message.send_exit
    wait_time = WAIT_TIME_NODE_EXIT
  end
  until @message.node_not_exist?
    sleep(wait_time)
    check_connection(true)
  end
  @logger.info("History of tasks") { "\n" + @queue.all_logs }
  @config.list.server.delete(@uri)
  Kernel.exit
end

#set_data(*args) ⇒ Object

Set *args to data storage, which must be string objects. The data is processed by hook of :process_data.

Parameters:

  • args (Array)

    An array of data strings.



297
298
299
300
301
302
303
304
305
# File 'lib/drbqs/server/server.rb', line 297

def set_data(*args)
  args.each do |s|
    if String === s
      @data_storage << s
    else
      @logger.error("Invalid data type\n#{s.inspect}")
    end
  end
end

#set_file_transfer(directory, opts = {}) ⇒ Object

Parameters:

  • directory (String)

    Set the directory to save files from nodes.

  • opts (Hash) (defaults to: {})

    The options for SFTP.

Options Hash (opts):

  • :host (String)

    Hostname for SFTP.

  • :user (String)

    User name for SFTP.



287
288
289
290
291
292
# File 'lib/drbqs/server/server.rb', line 287

def set_file_transfer(directory, opts = {})
  if @transfer_setting.setup_server(directory, opts)
    @ts[:transfer] = @transfer_setting
    @logger.info("File transfer") { @transfer_setting.information }
  end
end

#set_finalization_task(*tasks) ⇒ Object

Parameters:

  • tasks (Array)

    An array of DRbQS::task objects, which are executed at initialization



170
171
172
# File 'lib/drbqs/server/server.rb', line 170

def set_finalization_task(*tasks)
  @finalization_task.concat(tasks)
end

#set_initialization_task(*tasks) ⇒ Object

Parameters:

  • tasks (Array)

    An array of DRbQS::task objects, which are executed at initialization



165
166
167
# File 'lib/drbqs/server/server.rb', line 165

def set_initialization_task(*tasks)
  @message.set_initialization_tasks(tasks)
end

#set_signal_trapObject



276
277
278
279
280
281
# File 'lib/drbqs/server/server.rb', line 276

def set_signal_trap
  Signal.trap(:TERM) do
    @logger.error("Get TERM signal.")
    self.exit
  end
end

#startObject

Initialize and start druby service.



101
102
103
104
105
106
107
# File 'lib/drbqs/server/server.rb', line 101

def start
  set_file_transfer(nil)
  DRb.install_acl(@acl) if @acl
  DRb.start_service(@uri, @ts)
  @config.list.server.save(@uri, server_data)
  @logger.info("Start DRb service") { @uri }
end

#task_generator(opts = {}) {|tgen| ... } ⇒ Object

Create new task generator and add it.

Parameters:

Yields:

Yield Parameters:

  • tgen (DRbQS::TaskGenerator)

    Task generator to add to the server



128
129
130
131
132
133
# File 'lib/drbqs/server/server.rb', line 128

def task_generator(opts = {}, &block)
  gen = DRbQS::Task::Generator.new
  gen.set(opts, &block)
  add_task_generator(gen)
  nil
end

#transfer_directoryObject



72
73
74
# File 'lib/drbqs/server/server.rb', line 72

def transfer_directory
  @transfer_setting.prepared_directory
end

#waitObject



405
406
407
408
409
410
411
412
413
414
415
416
417
418
# File 'lib/drbqs/server/server.rb', line 405

def wait
  first_task_generator_init
  loop do
    check_message
    check_connection
    count_results = @queue.get_result(self)
    exec_hook
    @logger.debug("Calculating tasks: #{@queue.calculating_task_number}")
    if count_results <= 1
      sleep(WAIT_TIME_NEW_RESULT)
    end
  end
  clear_server_files
end