Class: HybridPlatformsConductor::IoRouter
- Inherits:
-
Object
- Object
- HybridPlatformsConductor::IoRouter
- Defined in:
- lib/hybrid_platforms_conductor/io_router.rb
Overview
Simple router of IO and queue streams from som inputs to outputs, asynchronous
Class Method Summary collapse
-
.with_io_router(routes) ⇒ Object
Create an IO router and make sure it is freed when client code has finished.
Instance Method Summary collapse
-
#initialize(routes) ⇒ IoRouter
constructor
Constructor.
-
#start ⇒ Object
Start routing messages asynchronously.
-
#stop ⇒ Object
Stop routing messages asynchronously.
Constructor Details
#initialize(routes) ⇒ IoRouter
Constructor
- Parameters
-
routes (Hash<IO or Queue, Array<IO> >): List of destination IOs that should receive content per source IO.
25 26 27 28 |
# File 'lib/hybrid_platforms_conductor/io_router.rb', line 25 def initialize(routes) @routes = routes @reading_thread = nil end |
Class Method Details
.with_io_router(routes) ⇒ Object
Create an IO router and make sure it is freed when client code has finished
- Parameters
-
routes (Hash<IO or Queue, Array<IO> >): List of destination IOs that should receive content per source IO.
-
Proc: Client code
11 12 13 14 15 16 17 18 19 |
# File 'lib/hybrid_platforms_conductor/io_router.rb', line 11 def self.with_io_router(routes) io_router = IoRouter.new(routes) begin io_router.start yield ensure io_router.stop end end |
Instance Method Details
#start ⇒ Object
Start routing messages asynchronously
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/hybrid_platforms_conductor/io_router.rb', line 31 def start raise 'IO router is already started. Can\'t start it again.' unless @reading_thread.nil? @end_read = false # Create a thread to handle routes asynchronously @reading_thread = Thread.new do loop do need_to_stop = @end_read.clone data_found = false @routes.each do |src_io, dst_ios| if src_io.is_a?(Queue) queue_size = src_io.size if queue_size > 0 # There is data to be read from src_io data_found = true data_chunk_str = queue_size.times.map { src_io.pop }.join dst_ios.each do |dst_io| dst_io << data_chunk_str dst_io.flush if dst_io.respond_to?(:flush) end end else raise "Unknown type of source IO: #{src_io}" end end break if need_to_stop && !data_found sleep 0.1 end end end |
#stop ⇒ Object
Stop routing messages asynchronously
62 63 64 65 66 |
# File 'lib/hybrid_platforms_conductor/io_router.rb', line 62 def stop raise 'IO router is not started. Can\'t stop it.' if @reading_thread.nil? @end_read = true @reading_thread.join end |