Class: LogCourier::ClientFactoryZmq
- Inherits:
-
Object
- Object
- LogCourier::ClientFactoryZmq
- Defined in:
- lib/log-courier/server_zmq.rb
Instance Attribute Summary collapse
-
#options ⇒ Object
readonly
Returns the value of attribute options.
-
#send_queue ⇒ Object
readonly
Returns the value of attribute send_queue.
Instance Method Summary collapse
- #deliver(source, data, &block) ⇒ Object
-
#initialize(options, send_queue) ⇒ ClientFactoryZmq
constructor
A new instance of ClientFactoryZmq.
- #shutdown ⇒ Object
Constructor Details
#initialize(options, send_queue) ⇒ ClientFactoryZmq
Returns a new instance of ClientFactoryZmq.
211 212 213 214 215 216 217 218 219 |
# File 'lib/log-courier/server_zmq.rb', line 211 def initialize(, send_queue) @options = @logger = @options[:logger] @send_queue = send_queue @index = {} @client_threads = {} @mutex = Mutex.new end |
Instance Attribute Details
#options ⇒ Object (readonly)
Returns the value of attribute options.
208 209 210 |
# File 'lib/log-courier/server_zmq.rb', line 208 def @options end |
#send_queue ⇒ Object (readonly)
Returns the value of attribute send_queue.
209 210 211 |
# File 'lib/log-courier/server_zmq.rb', line 209 def send_queue @send_queue end |
Instance Method Details
#deliver(source, data, &block) ⇒ Object
237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 |
# File 'lib/log-courier/server_zmq.rb', line 237 def deliver(source, data, &block) # Find the handling thread # We separate each source into threads so that each thread can respond # with partial ACKs if we hit a slow down # If we processed in a single thread, we'd only be able to respond to # a single client with partial ACKs @mutex.synchronize do index = @index source.each do |identity| index[identity] = {} if !index.key?(identity) index = index[identity] end if !index.key?('') source_str = source.map do |s| s.each_byte.map do |b| b.to_s(16).rjust(2, '0') end end.join @logger.info 'New source', :source => source_str unless @logger.nil? # Create the client and associated thread client = ClientZmq.new(self, source, source_str) do try_drop source, source_str end thread = Thread.new do client.run &block end @client_threads[thread] = thread index[''] = { 'client' => client, 'thread' => thread, } end # Existing thread, throw on the queue, if not enough room (timeout) drop the message begin index['']['client'].push data, 0 rescue LogCourier::TimeoutError # TODO: Log a warning about this? end end return end |
#shutdown ⇒ Object
221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 |
# File 'lib/log-courier/server_zmq.rb', line 221 def shutdown # Stop other threads from try_drop collisions client_threads = @mutex.synchronize do client_threads = @client_threads @client_threads = {} client_threads end client_threads.each_value do |thr| thr.raise ShutdownSignal end client_threads.each_value(&:join) return end |