Class: Hive::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/hive/worker.rb,
lib/hive/worker/shell.rb

Overview

The generic worker class

Direct Known Subclasses

Shell

Defined Under Namespace

Classes: DeviceNotReady, InvalidJobReservationError, NoPortsAvailable, Shell

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options) ⇒ Worker

The main worker process loop



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/hive/worker.rb', line 28

def initialize(options)
  @options = options
  @parent_pid = @options['parent_pid']
  @device_id = @options['id']
  @hive_id = @options['hive_id']
  @hive_mind ||= mind_meld_klass.new(
    url: Chamber.env.network.hive_mind? ? Chamber.env.network.hive_mind : nil,
    pem: Chamber.env.network.cert ? Chamber.env.network.cert : nil,
    ca_file: Chamber.env.network.cafile ? Chamber.env.network.cafile : nil,
    verify_mode: Chamber.env.network.verify_mode ? Chamber.env.network.verify_mode : nil,
    device: hive_mind_device_identifiers
  )
  @device_identity = @options['device_identity'] || 'unknown-device'
  pid = Process.pid
  $PROGRAM_NAME = "#{@options['name_stub'] || 'WORKER'}.#{pid}"
  @log = Hive::Log.new
  @log.add_logger(
    "#{LOG_DIRECTORY}/#{pid}.#{@device_identity}.log",
    Hive.config.logging.worker_level || 'INFO'
  )

  self.update_queues

  @port_allocator = (@options.has_key?('port_allocator') ? @options['port_allocator'] : Hive::PortAllocator.new(ports: []))
  
  platform = self.class.to_s.scan(/[^:][^:]*/)[2].downcase
  @diagnostic_runner = Hive::DiagnosticRunner.new(@options, Hive.config.diagnostics, platform) if Hive.config.diagnostics? && Hive.config.diagnostics[platform]

  Hive::Messages.configure do |config|
    config.base_path = Hive.config.network.scheduler
    config.pem_file = Hive.config.network.cert
    config.ssl_verify_mode = OpenSSL::SSL::VERIFY_NONE
  end

  Signal.trap('TERM') do
    @log.info("Worker terminated")
    exit
  end

  @log.info('Starting worker')
  while keep_running?
    begin
      diagnostics
      update_queues
      poll_queue
    rescue DeviceNotReady => e
      @log.info("#{e.message}\n");
    rescue StandardError => e
      @log.warn("Worker loop aborted: #{e.message}\n  : #{e.backtrace.join("\n  : ")}")
    end
    sleep Hive.config.timings.worker_loop_interval
  end
  @log.info('Exiting worker')
end

Instance Attribute Details

#device_apiObject

Device API Object for device associated with this worker



25
26
27
# File 'lib/hive/worker.rb', line 25

def device_api
  @device_api
end

#queuesObject

Device API Object for device associated with this worker



25
26
27
# File 'lib/hive/worker.rb', line 25

def queues
  @queues
end

Instance Method Details

#allocate_portObject

Allocate a port



408
409
410
411
412
# File 'lib/hive/worker.rb', line 408

def allocate_port
  @log.warn("Using deprecated 'Hive::Worker.allocate_port' method")
  @log.warn("Use @port_allocator.allocate_port instead")
  @port_allocator.allocate_port
end

#autogenerated_queuesObject

List of autogenerated queues for the worker



250
251
252
# File 'lib/hive/worker.rb', line 250

def autogenerated_queues
  []
end

#checkout_code(repository, checkout_directory) ⇒ Object

Get a checkout of the repository



373
374
375
# File 'lib/hive/worker.rb', line 373

def checkout_code(repository, checkout_directory)
  CodeCache.repo(repository).checkout(:head, checkout_directory) or raise "Unable to checkout repository #{repository}"
end

#cleanupObject

Do whatever device cleanup is required



404
405
# File 'lib/hive/worker.rb', line 404

def cleanup
end

#detect_res_file(results_dir) ⇒ Object



347
348
349
# File 'lib/hive/worker.rb', line 347

def detect_res_file(results_dir)
  Dir.glob( "#{results_dir}/*.res" ).first
end

#device_statusObject

Current state of the device This method should be replaced in child classes, as appropriate



239
240
241
# File 'lib/hive/worker.rb', line 239

def device_status
  @device_status ||= 'happy'
end

#diagnosticsObject

Diagnostics function to be extended in child class, as required

Raises:



230
231
232
233
234
235
# File 'lib/hive/worker.rb', line 230

def diagnostics
  @diagnostic_runner.run if !@diagnostic_runner.nil?
  status = device_status
  status = set_device_status('happy') if status == 'busy'
  raise DeviceNotReady.new("Current device status: '#{status}'") if status != 'happy'
end

#execute_jobObject

Execute a job



125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
# File 'lib/hive/worker.rb', line 125

def execute_job
  # Ensure that a killed worker cleans up correctly
  Signal.trap('TERM') do |s|
    Signal.trap('TERM') {} # Prevent retry signals
    @log.info "Caught TERM signal"
    @log.info "Terminating script, if running"
    @script.terminate if @script
    @log.info "Post-execution cleanup"
    signal_safe_post_script(@job, @file_system, @script)

    # Upload results
    @file_system.finalise_results_directory
    upload_files(@job, @file_system.results_path, @file_system.logs_path)
    set_job_state_to :completed
    @job.error('Worker killed')
    @log.info "Worker terminated"
    exit
  end

  @log.info('Job starting')
  @job.prepare(@device_id)
  
  exception = nil
  begin
    @log.info "Setting job paths"
    @file_system = Hive::FileSystem.new(@job.job_id, Hive.config.logging.home, @log)
    set_job_state_to :preparing

    if ! @job.repository.to_s.empty?
      @log.info "Checking out the repository"
      @log.debug "  #{@job.repository}"
      @log.debug "  #{@file_system.testbed_path}"
      checkout_code(@job.repository, @file_system.testbed_path)
    end

    @log.info "Initialising execution script"
    @script = Hive::ExecutionScript.new(
      file_system: @file_system,
      log: @log,
      keep_running: ->() { self.keep_running? }
    )
    @script.append_bash_cmd "mkdir -p #{@file_system.testbed_path}/#{@job.execution_directory}"
    @script.append_bash_cmd "cd #{@file_system.testbed_path}/#{@job.execution_directory}"

    @log.info "Setting the execution variables in the environment"
    @script.set_env 'HIVE_RESULTS', @file_system.results_path
    @job.execution_variables.to_h.each_pair do |var, val|
      @script.set_env "HIVE_#{var.to_s}".upcase, val if ! val.kind_of?(Array)
    end
    if @job.execution_variables.retry_urns && !@job.execution_variables.retry_urns.empty?
      @script.set_env "RETRY_URNS", @job.execution_variables.retry_urns
    end
    if @job.execution_variables.tests && @job.execution_variables.tests != [""]
      @script.set_env "TEST_NAMES", @job.execution_variables.tests
    end
    

    @log.info "Appending test script to execution script"
    @script.append_bash_cmd @job.command

    set_job_state_to :running
    @job.start

    @log.info "Pre-execution setup"
    pre_script(@job, @file_system, @script)

    @log.info "Running execution script"
    exit_value = @script.run
    @job.end(exit_value)
  rescue => e
    exception = e
  end

  begin
    @log.info "Post-execution cleanup"
    set_job_state_to :uploading
    post_script(@job, @file_system, @script)

    # Upload results
    @file_system.finalise_results_directory
    upload_files(@job, @file_system.results_path, @file_system.logs_path)
    upload_results(@job, "#{@file_system.testbed_path}/#{@job.execution_directory}", @file_system.results_path)
  rescue => e
    @log.error( "Post execution failed: " + e.message)
    @log.error("  : #{e.backtrace.join("\n  : ")}")
  end

  if exception
    @job.error( exception.message )
    set_job_state_to :completed
    raise exception
  else
    @job.complete
  end

  Signal.trap('TERM') do
    @log.info("Worker terminated")
    exit
  end

  set_job_state_to :completed
  exit_value == 0
end

#hive_mind_device_identifiersObject

Parameters for uniquely identifying the device



436
437
438
# File 'lib/hive/worker.rb', line 436

def hive_mind_device_identifiers
  { id: @device_id }
end

#job_message_klassObject

Get the correct job class This should usually be replaced in the child class



110
111
112
113
# File 'lib/hive/worker.rb', line 110

def job_message_klass
  @log.info 'Generic job class'
  Hive::Messages::Job
end

#keep_running?Boolean

Determine whether to keep the worker running This just checks the presense of the parent process

Returns:

  • (Boolean)


379
380
381
382
383
384
385
386
# File 'lib/hive/worker.rb', line 379

def keep_running?
  begin
    Process.getpgid(@parent_pid)
    true
  rescue
    false
  end
end

#lion_config(checkout) ⇒ Object



368
369
370
# File 'lib/hive/worker.rb', line 368

def lion_config(checkout)
  Dir.glob( "#{checkout}/.lion.yml" ).first
end

#mind_meld_klassObject



115
116
117
# File 'lib/hive/worker.rb', line 115

def mind_meld_klass
  MindMeld::Device
end

#poll_queueObject

Check the queues for work



84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/hive/worker.rb', line 84

def poll_queue
  @job = reserve_job
  if @job.nil?
    @log.info('No job found')
  else
    @log.info('Job starting')
    begin
      execute_job
    rescue => e
      @log.info("Error running test: #{e.message}\n : #{e.backtrace.join("\n :")}")
    end
    cleanup
  end
end

#post_script(job, file_system, script) ⇒ Object

Any device specific steps immediately after the execution script



393
394
395
# File 'lib/hive/worker.rb', line 393

def post_script(job, file_system, script)
  signal_safe_post_script(job, file_system, script)
end

#pre_script(job, file_system, script) ⇒ Object

Any setup required before the execution script



389
390
# File 'lib/hive/worker.rb', line 389

def pre_script(job, file_system, script)
end

#process_xunit_results(results_dir) ⇒ Object



351
352
353
354
355
356
357
358
359
360
361
362
# File 'lib/hive/worker.rb', line 351

def process_xunit_results(results_dir) 
  if !Dir.glob("#{results_dir}/*.xml").empty?
    xunit_output = Res.parse_results(parser: :junit,:file =>  Dir.glob( "#{results_dir}/*.xml" ).first)
    res_output = File.open(xunit_output.io, "rb")
    contents = res_output.read
    res_output.close
    res = File.open("#{results_dir}/xunit.res", "w+")
    res.puts contents   
    res.close   
    res 
  end
end

#release_all_portsObject

Release all ports



422
423
424
425
426
# File 'lib/hive/worker.rb', line 422

def release_all_ports
  @log.warn("Using deprecated 'Hive::Worker.release_all_ports' method")
  @log.warn("Use @port_allocator.release_all_ports instead")
  @port_allocator.release_all_ports
end

#release_port(p) ⇒ Object

Release a port



415
416
417
418
419
# File 'lib/hive/worker.rb', line 415

def release_port(p)
  @log.warn("Using deprecated 'Hive::Worker.release_port' method")
  @log.warn("Use @port_allocator.release_port instead")
  @port_allocator.release_port(p)
end

#reservation_detailsObject



119
120
121
122
# File 'lib/hive/worker.rb', line 119

def reservation_details
  @log.debug "Reservations details: hive_id=#{@hive_id}, worker_pid=#{Process.pid}"
  { hive_id: @hive_id, worker_pid: Process.pid }
end

#reserve_jobObject

Try to find and reserve a job



100
101
102
103
104
105
106
# File 'lib/hive/worker.rb', line 100

def reserve_job
  @log.info "Trying to reserve job for queues: #{@queues.join(', ')}"
  job = job_message_klass.reserve(@queues, reservation_details)
  @log.debug "Job: #{job.inspect}"
  raise InvalidJobReservationError.new("Invalid Job Reserved") if ! (job.nil? || job.valid?)
  job
end

#set_device_status(status) ⇒ Object

Set the status of a device This method should be replaced in child classes, as appropriate



245
246
247
# File 'lib/hive/worker.rb', line 245

def set_device_status(status)
  @device_status = status
end

#set_job_state_to(state) ⇒ Object

Set job info file



429
430
431
432
433
# File 'lib/hive/worker.rb', line 429

def set_job_state_to state
  File.open("#{@file_system.home_path}/job_info", 'w') do |f|
    f.puts "#{Process.pid} #{state}"
  end
end

#signal_safe_post_script(job, file_system, script) ⇒ Object

Any device specific steps immediately after the execution script that can be safely run in the a Signal.trap This should be called by post_script



400
401
# File 'lib/hive/worker.rb', line 400

def signal_safe_post_script(job, file_system, script)
end

#testmine_config(checkout) ⇒ Object



364
365
366
# File 'lib/hive/worker.rb', line 364

def testmine_config(checkout)
  Dir.glob( "#{checkout}/.testmi{n,t}e.yml" ).first
end

#update_queue_logObject



263
264
265
# File 'lib/hive/worker.rb', line 263

def update_queue_log
  File.open("#{LOG_DIRECTORY}/#{Process.pid}.queues.yml",'w') { |f| f.write @queues.to_yaml}
end

#update_queuesObject



254
255
256
257
258
259
260
261
# File 'lib/hive/worker.rb', line 254

def update_queues
  # Get Queues from Hive Mind
  @log.debug("Getting queues from Hive Mind")
  @queues = (autogenerated_queues + @hive_mind.hive_queues(true)).uniq
  @log.debug("hive queues: #{@hive_mind.hive_queues}")
  @log.debug("Full list of queues: #{@queues}")
  update_queue_log
end

#upload_files(job, *paths) ⇒ Object

Upload any files from the test



268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
# File 'lib/hive/worker.rb', line 268

def upload_files(job, *paths)
  @log.info("Uploading assets")
  paths.each do |path|
    @log.info("Uploading files from #{path}")
    Dir.foreach(path) do |item|
      @log.info("File: #{item}")
      next if item == '.' or item == '..'
      begin
        artifact = job.report_artifact("#{path}/#{item}")
        @log.info("Artifact uploaded: #{artifact.attributes.to_s}")
      rescue => e
        @log.error("Error uploading artifact #{item}: #{e.message}")
        @log.error("  : #{e.backtrace.join("\n  : ")}")
      end
    end
  end
end

#upload_results(job, checkout, results_dir) ⇒ Object

Update results



287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
# File 'lib/hive/worker.rb', line 287

def upload_results(job, checkout, results_dir)

  res_file = detect_res_file(results_dir) || process_xunit_results(results_dir)
  
  if res_file
    @log.info("Res file found")
  
    begin
      Res.submit_results(
        reporter: :hive,
        ir: res_file,
        job_id: job.job_id
      )
    rescue => e
      @log.warn("Res Hive upload failed #{e.message}")
    end
  
    begin
      if conf_file = testmine_config(checkout)
        Res.submit_results(
          reporter: :testmine,
          ir: res_file,
          config_file: conf_file,
          hive_job_id: job.job_id,
          version: job.execution_variables.version,
          target: job.execution_variables.queue_name
        )
      end
    rescue => e
      @log.warn("Res Testmine upload failed #{e.message}")
    end

    begin
      if conf_file = lion_config(checkout)
        Res.submit_results(
            reporter: :lion,
            ir: res_file,
            config_file: conf_file,
            hive_job_id: job.job_id,
            version: job.execution_variables.version,
            target: job.execution_variables.queue_name,
            cert: Chamber.env.network.cert,
            cacert: Chamber.env.network.cafile,
            ssl_verify_mode: Chamber.env.network.verify_mode
        )
      end
    rescue => e
      @log.warn("Res Lion upload failed #{e.message}")

      end




    # TODO Add in Testrail upload
  
  end
  
end