Class: Emissary::Operator
Defined Under Namespace
Modules: AMQP
Constant Summary
collapse
- DEFAULT_STATUS_INTERVAL =
3600
- DEFAULT_MAX_WORKERS =
50
- MAX_WORKER_TTL =
60
Emissary::OperatorStatistics::RX_COUNT_MUTEX, Emissary::OperatorStatistics::TX_COUNT_MUTEX
Instance Attribute Summary collapse
#rx_count, #tx_count
Class Method Summary
collapse
Instance Method Summary
collapse
#increment_rx_count, #increment_tx_count
Constructor Details
#initialize(config, *args) ⇒ Operator
Returns a new instance of Operator.
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
|
# File 'lib/emissary/operator.rb', line 87
def initialize(config, *args)
@config = config
@workers = (args[0][:max_workers] || DEFAULT_MAX_WORKERS rescue DEFAULT_MAX_WORKERS)
@agents = WorkQueue.new(@workers, nil, MAX_WORKER_TTL)
@publisher = WorkQueue.new(@workers, nil, MAX_WORKER_TTL)
@timer = nil
@stats = WorkQueue.new(1, nil, MAX_WORKER_TTL)
@rx_count = 0
@tx_count = 0
@shutting_down = false
@connected = false
end
|
Instance Attribute Details
Returns the value of attribute config.
64
65
66
|
# File 'lib/emissary/operator.rb', line 64
def config
@config
end
|
#shutting_down ⇒ Object
Returns the value of attribute shutting_down.
64
65
66
|
# File 'lib/emissary/operator.rb', line 64
def shutting_down
@shutting_down
end
|
#signature ⇒ Object
Returns the value of attribute signature.
64
65
66
|
# File 'lib/emissary/operator.rb', line 64
def signature
@signature
end
|
Class Method Details
.new(config, *args) ⇒ Object
Override .new so subclasses don’t have to call super and can ignore connection-specific arguments
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
|
# File 'lib/emissary/operator.rb', line 69
def self.new(config, *args)
allocate.instance_eval do
@signature = config[:signature]
initialize(config, *args)
post_init
@signature ||= Digest::MD5.hexdigest(config.to_s)
self
end
end
|
Instance Method Details
#acknowledge(message) ⇒ Object
121
122
|
# File 'lib/emissary/operator.rb', line 121
def acknowledge message
end
|
131
132
133
|
# File 'lib/emissary/operator.rb', line 131
def close
raise NotImplementedError, 'The close method must be defined by the operator module'
end
|
109
110
111
|
# File 'lib/emissary/operator.rb', line 109
def connect
raise NotImplementedError, 'The connect method must be defined by the operator module'
end
|
#connected? ⇒ Boolean
104
|
# File 'lib/emissary/operator.rb', line 104
def connected?() @connected; end
|
#disconnect ⇒ Object
143
144
145
146
|
# File 'lib/emissary/operator.rb', line 143
def disconnect
close
@connected = false
end
|
#enabled?(what) ⇒ Boolean
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
|
# File 'lib/emissary/operator.rb', line 171
def enabled? what
unless [ :startup, :shutdown, :stats ].include? what.to_sym
Emissary.logger.debug "Testing '#{what}' - it's disabled. Not a valid option."
return false
end
unless config[what]
Emissary.logger.debug "Testing '#{what}' - it's disabled. Missing from configuration."
return false
end
if (config[:disable]||[]).include? what.to_s
Emissary.logger.debug "Testing '#{what}' - it's disabled. Listed in 'disable' configuration option."
return false
end
Emissary.logger.debug "Testing '#{what}' - it's enabled.."
return true
end
|
#notify(type) ⇒ Object
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
|
# File 'lib/emissary/operator.rb', line 241
def notify type
return unless enabled? type and EM.reactor_running?
message = Emissary::Message.new(:data => { :agent => :emissary, :method => type })
case type
when :startup, :shutdown
message.recipient = config[type]
when :stats
message.agent = :stats
message.method = :gather
end
Emissary.logger.notice "Running #{type.to_s.capitalize} Notifier"
receive message
end
|
#post_init ⇒ Object
106
107
|
# File 'lib/emissary/operator.rb', line 106
def post_init
end
|
#receive(message) ⇒ Object
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
|
# File 'lib/emissary/operator.rb', line 199
def receive message
@agents.enqueue_b {
begin
raise message.errors.first unless message.errors.empty? or not message.errors.first.is_a? Exception
Emissary.logger.debug " ---> [DISPATCHER] Dispatching new message ... "
Emissary.dispatch(message, config, self).activate
received message
rescue ::Emissary::Error::InvalidMessageFormat => e
Emissary.logger.warning e.message
rejected message, :requeue => true
rescue Exception => e
Emissary.logger.error "AgentThread Error: #{e.class.name}: #{e.message}\n\t#{e.backtrace.join("\n\t")}"
send message.error(e)
rejected message, :requeue => true
else
increment_rx_count
end
Emissary.logger.debug " ---> [DISPATCHER] tasks/workers: #{@agents.cur_tasks}/#{@agents.cur_threads}"
}
end
|
#received(message) ⇒ Object
191
192
193
|
# File 'lib/emissary/operator.rb', line 191
def received message
acknowledge message
end
|
#reject(message, requeue = true) ⇒ Object
124
125
|
# File 'lib/emissary/operator.rb', line 124
def reject message, requeue = true
end
|
#rejected(message, opts = { :requeue => true }) ⇒ Object
195
196
197
|
# File 'lib/emissary/operator.rb', line 195
def rejected message, opts = { :requeue => true }
reject message, opts
end
|
135
136
137
138
139
140
141
|
# File 'lib/emissary/operator.rb', line 135
def run
@connected = !!connect
subscribe
schedule_statistics_gatherer
notify :startup
connected?
end
|
#schedule_statistics_gatherer ⇒ Object
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
|
# File 'lib/emissary/operator.rb', line 257
def schedule_statistics_gatherer
stats_interval = enabled?(:stats) && config[:stats][:interval] ? config[:stats][:interval].to_i : DEFAULT_STATUS_INTERVAL
@timer = EM.add_periodic_timer(stats_interval) do
rx = rx_count; tx = tx_count
rx_throughput = sprintf "%0.4f", (rx.to_f / stats_interval.to_f)
tx_throughput = sprintf "%0.4f", (tx.to_f / stats_interval.to_f)
Emissary.logger.notice "[statistics] publisher tasks/workers: #{@publisher.cur_tasks}/#{@publisher.cur_threads}"
Emissary.logger.notice "[statistics] dispatcher tasks/workers: #{@agents.cur_tasks}/#{@agents.cur_threads}"
Emissary.logger.notice "[statistics] #{tx} in #{stats_interval} seconds - tx rate: #{tx_throughput}/sec"
Emissary.logger.notice "[statistics] #{rx} in #{stats_interval} seconds - rx rate: #{rx_throughput}/sec"
notify :stats
end
end
|
#send(message) ⇒ Object
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
|
# File 'lib/emissary/operator.rb', line 222
def send message
@publisher.enqueue_b {
Emissary.logger.debug " ---> [PUBLISHER] Sending new message ... "
begin
unless message.will_loop?
Emissary.logger.debug "[PUBLISHER] -- Sending message..."
send_data message
increment_tx_count
else
Emissary.logger.notice "Not sending message destined for myself - would loop."
end
rescue Exception => e
Emissary.logger.error "PublisherThread Error: #{e.class.name}: #{e.message}\n\t#{e.backtrace.join("\n\t")}"
@shutting_down = true
end
Emissary.logger.debug " ---> [PUBLISHER] tasks/workers: #{@publisher.cur_tasks}/#{@publisher.cur_threads}"
}
end
|
#send_data ⇒ Object
127
128
129
|
# File 'lib/emissary/operator.rb', line 127
def send_data
raise NotImplementedError, 'The send_data method must be defined by the operator module'
end
|
#shutdown! ⇒ Object
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
|
# File 'lib/emissary/operator.rb', line 150
def shutdown!
unless shutting_down?
@shutting_down = true
Emissary.logger.info "Cancelling periodic timer for statistics gatherer..."
@timer.cancel
Emissary.logger.notice "Shutting down..."
notify :shutdown
Emissary.logger.info "Shutting down agent workqueue..."
@agents.join
Emissary.logger.info "Shutting down publisher workqueue..."
@publisher.join
Emissary.logger.info "Disconnecting..."
disconnect
end
end
|
#shutting_down? ⇒ Boolean
148
|
# File 'lib/emissary/operator.rb', line 148
def shutting_down?() @shutting_down; end
|
#subscribe ⇒ Object
113
114
115
|
# File 'lib/emissary/operator.rb', line 113
def subscribe
raise NotImplementedError, 'The subscrie method must be defined by the operator module'
end
|
#unsubscribe ⇒ Object
117
118
119
|
# File 'lib/emissary/operator.rb', line 117
def unsubscribe
raise NotImplementedError, 'The unsubscribe method must be defined by the operator module'
end
|