Class: DeepTest::CentralCommand

Inherits:
Object
  • Object
show all
Defined in:
lib/deep_test/central_command.rb

Defined Under Namespace

Modules: Operation, Result Classes: CheckIfAgentsAreStillRunning, NoAgentsRunningError, NoWorkUnitsAvailableError, NoWorkUnitsRemainingError, UnexpectedMessageError

Constant Summary collapse

NeedWork =
"NeedWork"
NoMoreWork =
"NoMoreWork"

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options) ⇒ CentralCommand

Returns a new instance of CentralCommand.



9
10
11
12
13
14
15
16
# File 'lib/deep_test/central_command.rb', line 9

def initialize(options)
  @options = options
  @work_queue = Queue.new
  @results_mutex = Mutex.new
  @results_condvar = ConditionVariable.new
  @results = []
  @data = Metrics::Data.new
end

Instance Attribute Details

#dataObject (readonly)

Returns the value of attribute data.



7
8
9
# File 'lib/deep_test/central_command.rb', line 7

def data
  @data
end

#operatorObject (readonly)

Returns the value of attribute operator.



5
6
7
# File 'lib/deep_test/central_command.rb', line 5

def operator
  @operator
end

#switchboardObject (readonly)

Returns the value of attribute switchboard.



6
7
8
# File 'lib/deep_test/central_command.rb', line 6

def switchboard
  @switchboard
end

Class Method Details

.start(options) ⇒ Object



65
66
67
68
69
# File 'lib/deep_test/central_command.rb', line 65

def self.start(options)
  central_command = new(options)
  central_command.start
  central_command
end

Instance Method Details

#done_with_workObject



18
19
20
# File 'lib/deep_test/central_command.rb', line 18

def done_with_work
  @done_with_work = true
end

#process_messagesObject



87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/deep_test/central_command.rb', line 87

def process_messages
  loop do
    begin
      return if @stop_process_messages
      wires_waiting_for_work.each { |w| send_work w }

      @results_mutex.synchronize do
        # make take_result wake up and check if any agents are running
        @results_condvar.signal
      end

      message, wire = switchboard.next_message(:timeout => 0.5)

      case message.body
      when NeedWork
        send_work wire
      when Result 
        write_result message.body
        send_work wire
      when Operation
        message.body.execute
      when Metrics::Measurement
        data.add message.body
      else 
        raise UnexpectedMessageError, message.inspect
      end

    rescue Telegraph::NoMessageAvailable
      retry
    rescue Exception => e
      raise unless @stop_process_messages
    end
  end
end

#put_abandonded_work_back_on_the_queueObject



142
143
144
145
146
147
148
149
150
# File 'lib/deep_test/central_command.rb', line 142

def put_abandonded_work_back_on_the_queue
  closed_wires = switchboard.using_wires { |wires| wires.select {|w| w.closed? }}
  closed_wires.each do |wire|
    wire.unacked_messages.each do |m|
      write_work m.body
      switchboard.drop_wire wire
    end
  end
end

#send_work(wire) ⇒ Object



126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# File 'lib/deep_test/central_command.rb', line 126

def send_work(wire)
  begin
    wire.send_message take_work, :need_ack => true
    wires_waiting_for_work.delete wire

  rescue NoWorkUnitsAvailableError
    put_abandonded_work_back_on_the_queue
    wires_waiting_for_work.add wire

  rescue NoWorkUnitsRemainingError
    wire.send_message NoMoreWork
    wires_waiting_for_work.delete wire

  end
end

#startObject



71
72
73
74
75
76
77
78
# File 'lib/deep_test/central_command.rb', line 71

def start
  @switchboard = Telegraph::Switchboard.new
  @operator = Telegraph::Operator.listen("0.0.0.0", 0, @switchboard)
  @options.server_port = @operator.port
  @process_messages_thread = Thread.new { process_messages }

  DeepTest.logger.info { "Started DeepTest service on port #{@operator.port}" }
end

#stderrObject



61
62
63
# File 'lib/deep_test/central_command.rb', line 61

def stderr
  $stderr
end

#stdoutObject



57
58
59
# File 'lib/deep_test/central_command.rb', line 57

def stdout
  $stdout
end

#stopObject



152
153
154
155
156
157
# File 'lib/deep_test/central_command.rb', line 152

def stop
  @stop_process_messages = true
  operator.shutdown
  @process_messages_thread.join
  data.save @options.metrics_file if @options.gathering_metrics?
end

#take_resultObject



22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/deep_test/central_command.rb', line 22

def take_result
  @results_mutex.synchronize do
    loop do
      if @results.any?
        return @results.shift
      else
        @results_condvar.wait @results_mutex
        raise NoAgentsRunningError unless @results.any? || @switchboard.any_live_wires?
      end
    end
  end
end

#take_workObject



35
36
37
38
39
40
41
42
43
44
# File 'lib/deep_test/central_command.rb', line 35

def take_work
  @work_queue.pop(true)
rescue ThreadError => e
  if e.message == "queue empty"
    raise NoWorkUnitsRemainingError if @done_with_work
    raise NoWorkUnitsAvailableError
  else
    raise
  end
end

#wires_waiting_for_workObject



122
123
124
# File 'lib/deep_test/central_command.rb', line 122

def wires_waiting_for_work
  @wires_waiting_for_work ||= Set.new
end

#write_result(result) ⇒ Object



46
47
48
49
50
51
# File 'lib/deep_test/central_command.rb', line 46

def write_result(result)
  @results_mutex.synchronize do
    @results << result
    @results_condvar.signal
  end
end

#write_work(work_unit) ⇒ Object



53
54
55
# File 'lib/deep_test/central_command.rb', line 53

def write_work(work_unit)
  @work_queue.push work_unit
end