Class: Pwrake::NBIO::MultiReader
Overview
Instance Attribute Summary collapse
-
#default_queue ⇒ Object
Returns the value of attribute default_queue.
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
Attributes inherited from Reader
#check_timeout, #host, #io, #waiter
Instance Method Summary collapse
- #[](ch) ⇒ Object
- #call ⇒ Object
- #error(e) ⇒ Object
-
#get_line(ch = nil) ⇒ Object
call from Fiber context.
- #halt ⇒ Object
-
#initialize(selector, io, n_chan = 0) ⇒ MultiReader
constructor
A new instance of MultiReader.
- #new_queue ⇒ Object
Methods inherited from Reader
#_read, #eof?, #read, #read_line_nonblock, #read_until, #readln, #select_io
Constructor Details
#initialize(selector, io, n_chan = 0) ⇒ MultiReader
Returns a new instance of MultiReader.
365 366 367 368 369 370 371 |
# File 'lib/pwrake/nbio.rb', line 365 def initialize(selector, io, n_chan=0) super(selector, io) @n_chan = n_chan @queue = @n_chan.times.map{|i| FiberReaderQueue.new(self)} @default_queue = FiberReaderQueue.new(self) @check_timeout = true end |
Instance Attribute Details
#default_queue ⇒ Object
Returns the value of attribute default_queue.
373 374 375 |
# File 'lib/pwrake/nbio.rb', line 373 def default_queue @default_queue end |
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
372 373 374 |
# File 'lib/pwrake/nbio.rb', line 372 def queue @queue end |
Instance Method Details
#[](ch) ⇒ Object
375 376 377 |
# File 'lib/pwrake/nbio.rb', line 375 def [](ch) @queue[ch] end |
#call ⇒ Object
395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 |
# File 'lib/pwrake/nbio.rb', line 395 def call while line = read_line_nonblock if /^(\d+):(.*)$/ =~ line ch,str = $1,$2 if q = @queue[ch.to_i] q.enq(str) else raise "No queue ##{ch}, received: #{line}" end elsif @default_queue @default_queue.enq(line) else raise "No default_queue, received: #{line}" end end rescue EOFError halt rescue IO::WaitReadable end |
#error(e) ⇒ Object
415 416 417 418 419 |
# File 'lib/pwrake/nbio.rb', line 415 def error(e) @closed = true @queue.each{|q| q.enq(e)} @default_queue.enq(e) end |
#get_line(ch = nil) ⇒ Object
call from Fiber context
387 388 389 390 391 392 393 |
# File 'lib/pwrake/nbio.rb', line 387 def get_line(ch=nil) if ch && !@queue.empty? @queue[ch].deq else @default_queue.deq end end |
#halt ⇒ Object
421 422 423 424 425 |
# File 'lib/pwrake/nbio.rb', line 421 def halt Log.debug("Handler.halt") if defined? Log @queue.each{|q| q.halt} @default_queue.halt end |
#new_queue ⇒ Object
379 380 381 382 383 384 |
# File 'lib/pwrake/nbio.rb', line 379 def new_queue n = @n_chan @queue << q = FiberReaderQueue.new(self) @n_chan += 1 [n,q] end |