Class: DRbQS::Server
- Inherits:
-
Object
- Object
- DRbQS::Server
- Includes:
- Misc
- Defined in:
- lib/drbqs/server/queue.rb,
lib/drbqs/server/server.rb,
lib/drbqs/server/history.rb,
lib/drbqs/server/message.rb,
lib/drbqs/server/acl_file.rb,
lib/drbqs/server/node_list.rb,
lib/drbqs/server/check_alive.rb,
lib/drbqs/server/server_hook.rb,
lib/drbqs/server/transfer_setting.rb
Direct Known Subclasses
Defined Under Namespace
Modules: ACLFile Classes: CheckAlive, History, Hook, Message, NodeList, Queue, TaskHistory, TransferSetting
Constant Summary collapse
- WAIT_TIME_NODE_EXIT =
3
- WAIT_TIME_NODE_FINALIZE =
10
- WAIT_TIME_NEW_RESULT =
1
Constants included from Misc
Instance Attribute Summary collapse
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
-
#uri ⇒ Object
readonly
Returns the value of attribute uri.
Instance Method Summary collapse
-
#add_hook(key, opts = {}, &block) ⇒ Object
Set a hook of server.
- #add_task_generator(task_generator) ⇒ Object
- #delete_hook(key, name = nil) ⇒ Object
- #exit ⇒ Object
-
#initialize(opts = {}) ⇒ Server
constructor
:nodoc: * Note of tuple spaces @ts[:message] - used in node/connection.rb - some messages to both server and node - special tasks from server to nodes @ts[:queue] - used in node/task_client.rb - tasks from server to nodes @ts[:result] - used in node/task_client.rb - accept signal from nodes - results from nodes.
-
#set_data(*args) ⇒ Object
Set *args to data storage, which must be string objects.
- #set_file_transfer(directory, opts = {}) ⇒ Object
- #set_finalization_task(*tasks) ⇒ Object
- #set_initialization_task(*tasks) ⇒ Object
- #set_signal_trap ⇒ Object
-
#start ⇒ Object
Initialize and start druby service.
-
#task_generator(opts = {}) {|tgen| ... } ⇒ Object
Create new task generator and add it.
- #transfer_directory ⇒ Object
- #wait ⇒ Object
Methods included from Misc
create_logger, create_uri, output_error, process_running_normally?, random_key, time_to_history_string, time_to_history_string2, uri_drbunix
Constructor Details
#initialize(opts = {}) ⇒ Server
:nodoc:
- Note of tuple spaces
@ts[:message]
- used in node/connection.rb
- some messages to both server and node
- special tasks from server to nodes @ts[:queue]
- used in node/task_client.rb
- tasks from server to nodes @ts[:result]
- used in node/task_client.rb
- accept signal from nodes
- results from nodes
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/drbqs/server/server.rb', line 48 def initialize(opts = {}) @uri = DRbQS::Misc.create_uri(opts) @acl = acl_init(opts[:acl]) @key = DRbQS::Misc.random_key + sprintf("_%d", Time.now.to_i) @ts = { :message => Rinda::TupleSpace.new, :queue => Rinda::TupleSpace.new, :result => Rinda::TupleSpace.new, :key => @key, :transfer => nil } @logger = DRbQS::Misc.create_logger(opts[:log_file], opts[:log_level]) @message = DRbQS::Server::Message.new(@ts[:message], @logger) @queue= DRbQS::Server::Queue.new(@ts[:queue], @ts[:result], @logger) @check_alive = DRbQS::Server::CheckAlive.new(opts[:check_alive]) @task_generator = [] hook_init(!opts[:not_exit], opts[:shutdown_unused_nodes]) set_signal_trap if !opts.has_key?(:signal_trap) || opts[:signal_trap] @finalization_task = [] @data_storage = [] @transfer_setting = DRbQS::Server::TransferSetting.new(opts[:sftp_host], opts[:sftp_user], opts[:file_directory]) @config = DRbQS::Config.new end |
Instance Attribute Details
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
18 19 20 |
# File 'lib/drbqs/server/server.rb', line 18 def queue @queue end |
#uri ⇒ Object (readonly)
Returns the value of attribute uri.
18 19 20 |
# File 'lib/drbqs/server/server.rb', line 18 def uri @uri end |
Instance Method Details
#add_hook(key, opts = {}, &block) ⇒ Object
When we set both :empty_queue and task generators, hook of :empty_queue is prior to task generators.
Set a hook of server.
182 183 184 185 186 187 188 189 |
# File 'lib/drbqs/server/server.rb', line 182 def add_hook(key, opts = {}, &block) if key == :process_data if @hook.number_of_hook(:process_data) != 0 raise "Hook :process_data has already set." end end @hook.add(key, opts, &block) end |
#add_task_generator(task_generator) ⇒ Object
120 121 122 |
# File 'lib/drbqs/server/server.rb', line 120 def add_task_generator(task_generator) @task_generator << task_generator end |
#delete_hook(key, name = nil) ⇒ Object
193 194 195 |
# File 'lib/drbqs/server/server.rb', line 193 def delete_hook(key, name = nil) @hook.delete(key, name) end |
#exit ⇒ Object
258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 |
# File 'lib/drbqs/server/server.rb', line 258 def exit if !@finalization_task.empty? @message.set_finalization_tasks(@finalization_task) @message.send_finalization wait_time = WAIT_TIME_NODE_FINALIZE else @message.send_exit wait_time = WAIT_TIME_NODE_EXIT end until @message.node_not_exist? sleep(wait_time) check_connection(true) end @logger.info("History of tasks") { "\n" + @queue.all_logs } @config.list.server.delete(@uri) Kernel.exit end |
#set_data(*args) ⇒ Object
Set *args to data storage, which must be string objects. The data is processed by hook of :process_data.
297 298 299 300 301 302 303 304 305 |
# File 'lib/drbqs/server/server.rb', line 297 def set_data(*args) args.each do |s| if String === s @data_storage << s else @logger.error("Invalid data type\n#{s.inspect}") end end end |
#set_file_transfer(directory, opts = {}) ⇒ Object
287 288 289 290 291 292 |
# File 'lib/drbqs/server/server.rb', line 287 def set_file_transfer(directory, opts = {}) if @transfer_setting.setup_server(directory, opts) @ts[:transfer] = @transfer_setting @logger.info("File transfer") { @transfer_setting.information } end end |
#set_finalization_task(*tasks) ⇒ Object
170 171 172 |
# File 'lib/drbqs/server/server.rb', line 170 def set_finalization_task(*tasks) @finalization_task.concat(tasks) end |
#set_initialization_task(*tasks) ⇒ Object
165 166 167 |
# File 'lib/drbqs/server/server.rb', line 165 def set_initialization_task(*tasks) @message.set_initialization_tasks(tasks) end |
#set_signal_trap ⇒ Object
276 277 278 279 280 281 |
# File 'lib/drbqs/server/server.rb', line 276 def set_signal_trap Signal.trap(:TERM) do @logger.error("Get TERM signal.") self.exit end end |
#start ⇒ Object
Initialize and start druby service.
101 102 103 104 105 106 107 |
# File 'lib/drbqs/server/server.rb', line 101 def start set_file_transfer(nil) DRb.install_acl(@acl) if @acl DRb.start_service(@uri, @ts) @config.list.server.save(@uri, server_data) @logger.info("Start DRb service") { @uri } end |
#task_generator(opts = {}) {|tgen| ... } ⇒ Object
Create new task generator and add it.
128 129 130 131 132 133 |
# File 'lib/drbqs/server/server.rb', line 128 def task_generator(opts = {}, &block) gen = DRbQS::Task::Generator.new gen.set(opts, &block) add_task_generator(gen) nil end |
#transfer_directory ⇒ Object
72 73 74 |
# File 'lib/drbqs/server/server.rb', line 72 def transfer_directory @transfer_setting.prepared_directory end |
#wait ⇒ Object
405 406 407 408 409 410 411 412 413 414 415 416 417 418 |
# File 'lib/drbqs/server/server.rb', line 405 def wait first_task_generator_init loop do check_connection count_results = @queue.get_result(self) exec_hook @logger.debug("Calculating tasks: #{@queue.calculating_task_number}") if count_results <= 1 sleep(WAIT_TIME_NEW_RESULT) end end clear_server_files end |