Class: Knj::Process_meta::Proxy_obj::Buffered_caller

Inherits:
Object
  • Object
show all
Defined in:
lib/knj/process_meta.rb

Instance Method Summary collapse

Constructor Details

#initialize(args) ⇒ Buffered_caller

Returns a new instance of Buffered_caller.



466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
# File 'lib/knj/process_meta.rb', line 466

def initialize(args)
  @args = args
  @buffer = []
  @mutex = Mutex.new
  @mutex_write = Mutex.new
  @count = 0
  @debug = @args[:debug] if @args[:debug]
  
  if @args[:count_to]
    @count_to = @args[:count_to]
  else
    @count_to = 1000
  end
  
  @buffer_max = @count_to * 2
  @threads = [] if @args[:async]
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missing(method_name, *args) ⇒ Object



484
485
486
487
488
489
490
# File 'lib/knj/process_meta.rb', line 484

def method_missing(method_name, *args)
  if method_name.to_s == @args[:method_name].to_s
    self._pm_call(*args)
  else
    raise NoMethodError, "No such method: '#{method_name}'."
  end
end

Instance Method Details

#_pm_call(*args) ⇒ Object

Raises:

  • (@raise_error)


492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
# File 'lib/knj/process_meta.rb', line 492

def _pm_call(*args)
  raise @raise_error if @raise_error
  
  @mutex.synchronize do
    while @count >= @count_to and @buffer.length >= @buffer_max
      STDOUT.print "Waiting for write to complete...\n" if @debug
      sleep 0.1
    end
    
    STDOUT.print "Adding to buffer #{@buffer.length}...\n" if @debug
    @buffer << args
    @count += 1
  end
  
  self._pm_flush if @count >= @count_to and !@writing
  return nil
end

#_pm_closeObject

Raises:

  • (@raise_error)


554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
# File 'lib/knj/process_meta.rb', line 554

def _pm_close
  self._pm_flush
  
  if @args[:async]
    threads_remove = []
    @threads.each do |thread|
      thread.join
      threads_remove << thread
    end
    
    @threads -= threads_remove
  end
  
  raise @raise_error if @raise_error
end

#_pm_flush(*args) ⇒ Object

Raises:

  • (@raise_error)


510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
# File 'lib/knj/process_meta.rb', line 510

def _pm_flush(*args)
  raise @raise_error if @raise_error
  
  buffer = nil
  @mutex.synchronize do
    buffer = @buffer
    @buffer = []
    @count = 0
  end
  
  if @args[:async]
    begin
      @threads << Thread.new do
        self._pm_flush_real(buffer)
      end
    rescue => e
      @raise_error = e
    end
    
    return nil
  else
    return self._pm_flush_real(buffer)
  end
end

#_pm_flush_real(buffer) ⇒ Object



535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
# File 'lib/knj/process_meta.rb', line 535

def _pm_flush_real(buffer)
  @mutex_write.synchronize do
    STDOUT.print "Writing...\n" if @debug
    @writing = true
    
    begin
      return @args[:process_meta].call_object(
        "var_name" => @args[:name],
        "method_name" => @args[:method_name],
        "args" => buffer,
        "buffered" => true,
        "capture_return" => false
      )
    ensure
      @writing = false
    end
  end
end