Class: DeepTest::CentralCommand
- Inherits:
-
Object
- Object
- DeepTest::CentralCommand
show all
- Defined in:
- lib/deep_test/central_command.rb
Defined Under Namespace
Modules: Operation, Result
Classes: CheckIfAgentsAreStillRunning, NoAgentsRunningError, NoWorkUnitsAvailableError, NoWorkUnitsRemainingError, UnexpectedMessageError
Constant Summary
collapse
- NeedWork =
"NeedWork"
- NoMoreWork =
"NoMoreWork"
Instance Attribute Summary collapse
Class Method Summary
collapse
Instance Method Summary
collapse
Constructor Details
Returns a new instance of CentralCommand.
9
10
11
12
13
14
15
16
|
# File 'lib/deep_test/central_command.rb', line 9
def initialize(options)
@options = options
@work_queue = Queue.new
@results_mutex = Mutex.new
@results_condvar = ConditionVariable.new
@results = []
@data = Metrics::Data.new
end
|
Instance Attribute Details
#data ⇒ Object
Returns the value of attribute data.
7
8
9
|
# File 'lib/deep_test/central_command.rb', line 7
def data
@data
end
|
#operator ⇒ Object
Returns the value of attribute operator.
5
6
7
|
# File 'lib/deep_test/central_command.rb', line 5
def operator
@operator
end
|
#switchboard ⇒ Object
Returns the value of attribute switchboard.
6
7
8
|
# File 'lib/deep_test/central_command.rb', line 6
def switchboard
@switchboard
end
|
Class Method Details
.start(options) ⇒ Object
65
66
67
68
69
|
# File 'lib/deep_test/central_command.rb', line 65
def self.start(options)
central_command = new(options)
central_command.start
central_command
end
|
Instance Method Details
#done_with_work ⇒ Object
18
19
20
|
# File 'lib/deep_test/central_command.rb', line 18
def done_with_work
@done_with_work = true
end
|
#process_messages ⇒ Object
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
|
# File 'lib/deep_test/central_command.rb', line 87
def process_messages
loop do
begin
return if @stop_process_messages
wires_waiting_for_work.each { |w| send_work w }
@results_mutex.synchronize do
@results_condvar.signal
end
message, wire = switchboard.next_message(:timeout => 0.5)
case message.body
when NeedWork
send_work wire
when Result
write_result message.body
send_work wire
when Operation
message.body.execute
when Metrics::Measurement
data.add message.body
else
raise UnexpectedMessageError, message.inspect
end
rescue Telegraph::NoMessageAvailable
retry
rescue Exception => e
raise unless @stop_process_messages
end
end
end
|
#put_abandonded_work_back_on_the_queue ⇒ Object
142
143
144
145
146
147
148
149
150
|
# File 'lib/deep_test/central_command.rb', line 142
def put_abandonded_work_back_on_the_queue
closed_wires = switchboard.using_wires { |wires| wires.select {|w| w.closed? }}
closed_wires.each do |wire|
wire.unacked_messages.each do |m|
write_work m.body
switchboard.drop_wire wire
end
end
end
|
#send_work(wire) ⇒ Object
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
|
# File 'lib/deep_test/central_command.rb', line 126
def send_work(wire)
begin
wire.send_message take_work, :need_ack => true
wires_waiting_for_work.delete wire
rescue NoWorkUnitsAvailableError
put_abandonded_work_back_on_the_queue
wires_waiting_for_work.add wire
rescue NoWorkUnitsRemainingError
wire.send_message NoMoreWork
wires_waiting_for_work.delete wire
end
end
|
#start ⇒ Object
71
72
73
74
75
76
77
78
|
# File 'lib/deep_test/central_command.rb', line 71
def start
@switchboard = Telegraph::Switchboard.new
@operator = Telegraph::Operator.listen("0.0.0.0", 0, @switchboard)
@options.server_port = @operator.port
@process_messages_thread = Thread.new { process_messages }
DeepTest.logger.info { "Started DeepTest service on port #{@operator.port}" }
end
|
#stderr ⇒ Object
61
62
63
|
# File 'lib/deep_test/central_command.rb', line 61
def stderr
$stderr
end
|
#stdout ⇒ Object
57
58
59
|
# File 'lib/deep_test/central_command.rb', line 57
def stdout
$stdout
end
|
#stop ⇒ Object
152
153
154
155
156
157
|
# File 'lib/deep_test/central_command.rb', line 152
def stop
@stop_process_messages = true
operator.shutdown
@process_messages_thread.join
data.save @options.metrics_file if @options.gathering_metrics?
end
|
#take_result ⇒ Object
22
23
24
25
26
27
28
29
30
31
32
33
|
# File 'lib/deep_test/central_command.rb', line 22
def take_result
@results_mutex.synchronize do
loop do
if @results.any?
return @results.shift
else
@results_condvar.wait @results_mutex
raise NoAgentsRunningError unless @results.any? || @switchboard.any_live_wires?
end
end
end
end
|
#take_work ⇒ Object
35
36
37
38
39
40
41
42
43
44
|
# File 'lib/deep_test/central_command.rb', line 35
def take_work
@work_queue.pop(true)
rescue ThreadError => e
if e.message == "queue empty"
raise NoWorkUnitsRemainingError if @done_with_work
raise NoWorkUnitsAvailableError
else
raise
end
end
|
#wires_waiting_for_work ⇒ Object
122
123
124
|
# File 'lib/deep_test/central_command.rb', line 122
def wires_waiting_for_work
@wires_waiting_for_work ||= Set.new
end
|
#write_result(result) ⇒ Object
46
47
48
49
50
51
|
# File 'lib/deep_test/central_command.rb', line 46
def write_result(result)
@results_mutex.synchronize do
@results << result
@results_condvar.signal
end
end
|
#write_work(work_unit) ⇒ Object
53
54
55
|
# File 'lib/deep_test/central_command.rb', line 53
def write_work(work_unit)
@work_queue.push work_unit
end
|