Class: DRbQS::ProcessDefinition

Inherits:
Object
  • Object
show all
Defined in:
lib/drbqs/execute/register.rb,
lib/drbqs/execute/process_define.rb

Defined Under Namespace

Classes: Register

Constant Summary collapse

PATH_CPUINFO =
"/proc/cpuinfo"
TIME_INTERVAL_EXECUTE_NODE =
1
TIME_INTERVAL_WAIT_SERVER_FINISH =
3

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(server, node, port, io = nil) ⇒ ProcessDefinition

Returns a new instance of ProcessDefinition.

Parameters:

  • server (Symbol)

    Symbol of server name

  • node (Array)

    An array of Symbol which means node name

  • port (String)

    Port number

  • io (IO, nil) (defaults to: nil)

    IO object to output



12
13
14
15
16
17
18
# File 'lib/drbqs/execute/process_define.rb', line 12

def initialize(server, node, port, io = nil)
  @server = server
  @node = node
  @port = port
  @register = DRbQS::ProcessDefinition::Register.new
  @io = io
end

Instance Attribute Details

#registerObject (readonly)

Returns the value of attribute register.



6
7
8
# File 'lib/drbqs/execute/process_define.rb', line 6

def register
  @register
end

Instance Method Details

#execute_nodeObject



225
226
227
228
229
230
231
232
233
234
# File 'lib/drbqs/execute/process_define.rb', line 225

def execute_node
  uri = server_uri(@server)
  if uri && /^drbunix/ !~ uri
    each_node_to_execute do |name, data|
      execute_one_node(name, data, uri)
      # If there is no time interval then drb does not work properly.
      sleep(TIME_INTERVAL_EXECUTE_NODE)
    end
  end
end

#execute_server(server_args) ⇒ Object



133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
# File 'lib/drbqs/execute/process_define.rb', line 133

def execute_server(server_args)
  if ary = get_server_setting(@server)
    name = ary[0].to_s
    data = ary[1]
    puts_progress "Execute server '#{name}' (#{data[:ssh] ? 'ssh' : 'local'})"
    setting = data[:setting]
    hostname = data[:args][0]
    type = data[:type]
    if data[:ssh]
      setting.value.connect name unless setting.set?(:connect)
      server_setting = setting.mode_setting
    else
      server_setting = setting
      server_setting.value.daemon FileName.create(local_log_directory, "server_execute.log", :position => :middle)
    end
    server_setting.set_server_argument(*server_args)
    if data[:unix_domain_socket]
      unless server_setting.value.unix
        server_setting.value.unix DRbQS::Temporary.socket_path
      end
      unless server_setting.value.execute_node
        server_setting.value.execute_node get_suitable_process_num
      end
    else
      server_setting.value.port server_port
      unless server_setting.set?(:sftp_host)
        server_setting.value.sftp_host hostname
      end
    end
    setting.parse!
    unless data[:ssh]
      server_setting.value.argument.each do |path|
        unless File.exist?(path)
          raise "File '#{path}' does not exist."
        end
      end
    end
    setting.exec
  end
rescue Exception => err
  puts_progress "Fail to execute server '#{data[:name].to_s}'"
  mes = "Invalid server definition: #{err.to_s} (#{err.class.to_s})"
  begin
    mes = "#{setting.string_for_shell}; " << mes if setting.respond_to?(:string_for_shell)
  rescue
  end
  new_err = err.class.new(mes)
  new_err.set_backtrace(err.backtrace)
  raise new_err
end

#informationObject



236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
# File 'lib/drbqs/execute/process_define.rb', line 236

def information
  info = {}
  info[:server] = @register.__server__.map do |name, data|
    new_data = data.dup
    new_data.delete(:setting)
    [name, new_data]
  end
  info[:node] = @register.__node__.map do |name, data|
    new_data = data.dup
    new_data.delete(:setting)
    [name, new_data]
  end
  if ary = get_server_setting(@server)
    default_server = ary[0]
  else
    default_server = nil
  end
  default_nodes = each_node_to_execute.map do |node_name, node_data|
    node_name
  end
  info[:default] = { :server => default_server, :node => default_nodes, :port => server_port }
  info
end

#information_stringObject



260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
# File 'lib/drbqs/execute/process_define.rb', line 260

def information_string
  info = information
  str = "Server:\n"
  ary = (info[:server] + info[:node]).map do |name, data|
    name.size
  end
  string_name_size = ary.max
  info[:server].each do |name, data|
    if data[:unix_domain_socket]
      prop = "local(unix socket domain)"
    elsif data[:ssh]
      prop = "ssh"
    else
      prop = "local(ssh)"
    end
    str << (info[:default][:server] == name ? " * " : (data[:template] ? " - " : "   "))
    str << sprintf("%- #{string_name_size}s  %s\n", name, prop)
  end
  str << "\nNode:\n"
  info[:node].each do |name, data|
    if data[:type] == :group
      prop = 'group: ' << data[:args].map(&:to_s).join(',')
    else
      prop = (data[:ssh] ? 'ssh' : 'local')
    end
    if info[:default][:node].include?(name)
      str << " * "
    elsif data[:type] == :group
      str << " # "
    elsif data[:template]
      str << " - "
    else
      str << "   "
    end
    str << sprintf("%- #{string_name_size}s  %s\n", name, prop)
  end
  str << "\nDefault port:\n   #{info[:default][:port]}"
  str << "\n\nHelp:\n"
  str << "   ssh:   Process over SSH\n"
  str << "   local: Process on localhost\n"
  str << "   *: default, -: template, #: node group"
end

#load(path) ⇒ Object



31
32
33
# File 'lib/drbqs/execute/process_define.rb', line 31

def load(path)
  @register.__load__(path)
end

#local_log_directoryObject

Log directory for processes on localhost. Processes over ssh does not use this directory.



27
28
29
# File 'lib/drbqs/execute/process_define.rb', line 27

def local_log_directory
  @logal_log_directory ||= FileName.create(default_value(:log) || 'drbqs_execute_log')
end

#test_consistencyObject



318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
# File 'lib/drbqs/execute/process_define.rb', line 318

def test_consistency
  # Test existence of default server
  if @server && !get_server_setting(@server)
    raise "Invalid default server: #{@server.inspect}"
  end
  # Test existences of default nodes
  if node_names = default_value(:node)
    all_node_find_p = true
    node_names.each do |node|
      unless get_node_data(node)
        all_node_find_p = false
        $stderr.puts "Node definition #{node.inspect} does not exist!"
      end
    end
    unless all_node_find_p
      raise "Invalid default node."
    end
  end
end

#usageObject



303
304
305
306
307
308
309
310
311
312
313
314
315
316
# File 'lib/drbqs/execute/process_define.rb', line 303

def usage
  if data = @register.__usage__
    str = data[:message] ? "\nDescription:\n#{data[:message]}" : ""
    if (server_file = data[:server]) && File.exist?(server_file)
      Kernel.load(server_file)
      if server_help = DRbQS.option_help_message
        str << "\n\n" << server_help
      end
    end
    str
  else
    ''
  end
end

#wait_server_finishObject



340
341
342
343
344
345
346
347
348
349
350
# File 'lib/drbqs/execute/process_define.rb', line 340

def wait_server_finish
  if uri = server_uri(@server)
    puts_progress "Wait finish of server #{uri}"
    manage = DRbQS::Manage.new(:uri => uri)
    while manage.server_respond?
      sleep(TIME_INTERVAL_EXECUTE_NODE)
    end
  else
    puts_progress "We tried to wait finish, however, we can not determine server uri"
  end
end