Class: Gorgon::Worker

Inherits:
Object
  • Object
show all
Includes:
GLogger
Defined in:
lib/gorgon/worker.rb

Constant Summary

Constants included from GLogger

GLogger::SIZE_1_MB

Class Method Summary collapse

Instance Method Summary collapse

Methods included from GLogger

#initialize_logger, #log, #log_error

Constructor Details

#initialize(params) ⇒ Worker

Returns a new instance of Worker.



76
77
78
79
80
81
82
83
84
# File 'lib/gorgon/worker.rb', line 76

def initialize(params)
  initialize_logger params[:log_file]

  @amqp = params[:amqp]
  @file_queue_name = params[:file_queue_name]
  @reply_exchange_name = params[:reply_exchange_name]
  @worker_id = params[:worker_id]
  @callback_handler = params[:callback_handler]
end

Class Method Details

.build(worker_id, config) ⇒ Object



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/gorgon/worker.rb', line 38

def build(worker_id, config)
  redirect_output_to_files worker_id

  payload = Yajl::Parser.new(:symbolize_keys => true).parse($stdin.read)
  job_definition = JobDefinition.new(payload)

  connection_config = config[:connection]
  amqp = AmqpService.new connection_config

  callback_handler = CallbackHandler.new(job_definition.callbacks)

  ENV["GORGON_WORKER_ID"] = worker_id.to_s

  params = {
    :amqp => amqp,
    :file_queue_name => job_definition.file_queue_name,
    :reply_exchange_name => job_definition.reply_exchange_name,
    :worker_id => worker_id,
    :callback_handler => callback_handler,
    :log_file => config[:log_file]
  }

  new(params)
end

.output_file(id, stream) ⇒ Object



63
64
65
# File 'lib/gorgon/worker.rb', line 63

def output_file id, stream
  "/tmp/gorgon-worker-#{id}.#{stream.to_s}"
end

.redirect_output_to_files(worker_id) ⇒ Object



67
68
69
70
71
72
73
# File 'lib/gorgon/worker.rb', line 67

def redirect_output_to_files worker_id
  STDOUT.reopen(File.open(output_file(worker_id, :out), 'w'))
  STDOUT.sync = true

  STDERR.reopen(File.open(output_file(worker_id, :err), 'w'))
  STDERR.sync = true
end

Instance Method Details

#workObject



86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/gorgon/worker.rb', line 86

def work
  begin
    log "Running before_start callback..."
    register_trap_ints        # do it before calling before_start callback!
    @callback_handler.before_start
    @cleaned = false

    log "Running files ..."
    @amqp.start_worker @file_queue_name, @reply_exchange_name do |queue, exchange|
      while filename = queue.pop
        exchange.publish make_start_message(filename)
        log "Running '#{filename}' with Worker: #{@worker_id}"
        test_results = nil # needed so run_file() inside the Benchmark will use this
        runtime = Benchmark.realtime do
          test_results = run_file(filename)
        end
        exchange.publish make_finish_message(filename, test_results, runtime)
      end
    end
  rescue Exception => e
    clean_up
    raise e                     # So worker manager can catch it
  end
  clean_up
end