Class: Pwrake::NBIO::MultiReader

Inherits:
Reader
  • Object
show all
Defined in:
lib/pwrake/nbio.rb

Overview


Instance Attribute Summary collapse

Attributes inherited from Reader

#check_timeout, #host, #io, #waiter

Instance Method Summary collapse

Methods inherited from Reader

#_read, #eof?, #read, #read_line_nonblock, #read_until, #readln, #select_io

Constructor Details

#initialize(selector, io, n_chan = 0) ⇒ MultiReader

Returns a new instance of MultiReader.



365
366
367
368
369
370
371
# File 'lib/pwrake/nbio.rb', line 365

def initialize(selector, io, n_chan=0)
  super(selector, io)
  @n_chan = n_chan
  @queue = @n_chan.times.map{|i| FiberReaderQueue.new(self)}
  @default_queue = FiberReaderQueue.new(self)
  @check_timeout = true
end

Instance Attribute Details

#default_queueObject

Returns the value of attribute default_queue.



373
374
375
# File 'lib/pwrake/nbio.rb', line 373

def default_queue
  @default_queue
end

#queueObject (readonly)

Returns the value of attribute queue.



372
373
374
# File 'lib/pwrake/nbio.rb', line 372

def queue
  @queue
end

Instance Method Details

#[](ch) ⇒ Object



375
376
377
# File 'lib/pwrake/nbio.rb', line 375

def [](ch)
  @queue[ch]
end

#callObject



395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
# File 'lib/pwrake/nbio.rb', line 395

def call
  while line = read_line_nonblock
    if /^(\d+):(.*)$/ =~ line
      ch,str = $1,$2
      if q = @queue[ch.to_i]
        q.enq(str)
      else
        raise "No queue ##{ch}, received: #{line}"
      end
    elsif @default_queue
      @default_queue.enq(line)
    else
      raise "No default_queue, received: #{line}"
    end
  end
rescue EOFError
  halt
rescue IO::WaitReadable
end

#error(e) ⇒ Object



415
416
417
418
419
# File 'lib/pwrake/nbio.rb', line 415

def error(e)
  @closed = true
  @queue.each{|q| q.enq(e)}
  @default_queue.enq(e)
end

#get_line(ch = nil) ⇒ Object

call from Fiber context



387
388
389
390
391
392
393
# File 'lib/pwrake/nbio.rb', line 387

def get_line(ch=nil)
  if ch && !@queue.empty?
    @queue[ch].deq
  else
    @default_queue.deq
  end
end

#haltObject



421
422
423
424
425
# File 'lib/pwrake/nbio.rb', line 421

def halt
  Log.debug("Handler.halt") if defined? Log
  @queue.each{|q| q.halt}
  @default_queue.halt
end

#new_queueObject



379
380
381
382
383
384
# File 'lib/pwrake/nbio.rb', line 379

def new_queue
  n = @n_chan
  @queue << q = FiberReaderQueue.new(self)
  @n_chan += 1
  [n,q]
end