Class: Fairy::OnMemorySortedQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/fairy/share/port.rb

Direct Known Subclasses

SortedQueue1

Instance Method Summary collapse

Constructor Details

#initialize(policy, queue_mon = Monitor.new, queue_cv = queue_mon.new_cond) ⇒ OnMemorySortedQueue

Returns a new instance of OnMemorySortedQueue.



1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
# File 'lib/fairy/share/port.rb', line 1543

def initialize(policy, queue_mon = Monitor.new, queue_cv = queue_mon.new_cond)
  @policy = policy

  @pool_threshold = policy[:pool_threshold]
  @pool_threshold ||= CONF.SORTEDQUEUE_POOL_THRESHOLD

  @push_queue = []
  @pop_queue = nil

  @queue_mutex = queue_mon
  @queue_cv = queue_cv

  @sort_by = policy[:sort_by]
  @sort_by ||= CONF.SORTEDQUEUE_SORTBY   

  if @sort_by.kind_of?(String)
	@sort_by = eval("proc{#{@sort_by}}")
  end
end

Instance Method Details

#popObject



1585
1586
1587
1588
1589
1590
# File 'lib/fairy/share/port.rb', line 1585

def pop
  @queue_mon.synchronize do
	@queue_cv.wait_while{@pop_queue.nil?}
	@pop_queue.shift
  end
end

#pop_allObject



1592
1593
1594
1595
1596
1597
# File 'lib/fairy/share/port.rb', line 1592

def pop_all
  @queue_mon.synchronize do
	@queue_cv.wait_while{@pop_queue.nil?}
	@pop_queue.shift(@pool_threshold)
  end
end

#push(e) ⇒ Object



1563
1564
1565
1566
1567
1568
1569
1570
1571
# File 'lib/fairy/share/port.rb', line 1563

def push(e)
  @queue_mon.synchronize do
	@push_queue.push e
	if e == :END_OF_STREAM
	  @push_queue.pop
	  push_on_eos
	end
  end
end

#push_on_eosObject



1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
# File 'lib/fairy/share/port.rb', line 1573

def push_on_eos
  begin
	@pop_queue = @push_queue.sort_by{|e| @sort_by.call(e)}
	@pop_queue.push :END_OF_STREAM
	@push_queue.clear
	@push_queue = nil
  rescue
	Log::debug_exception
  end
  @queue_cv.broadcast
end