Class: ASIR::Channel
- Inherits:
-
Object
- Object
- ASIR::Channel
- Includes:
- Initialization, RetryBehavior
- Defined in:
- lib/asir/channel.rb
Overview
Generic I/O Channel abstraction. Handles stream per Thread and forked child processes.
Constant Summary collapse
- ON_ERROR =
lambda do | channel, exc, action, stream | channel.close rescue nil raise exc end
- ON_CLOSE =
lambda do | channel, stream | stream.close rescue nil if stream end
- ON_RETRY =
lambda do | channel, exc, action | end
Instance Attribute Summary collapse
-
#on_close ⇒ Object
Returns the value of attribute on_close.
-
#on_connect ⇒ Object
Returns the value of attribute on_connect.
-
#on_error ⇒ Object
Returns the value of attribute on_error.
-
#on_retry ⇒ Object
Returns the value of attribute on_retry.
Attributes included from RetryBehavior
#try_max, #try_sleep, #try_sleep_increment, #try_sleep_max
Instance Method Summary collapse
-
#_stream ⇒ Object
Returns the stream for this Channel, or nil.
-
#_streams ⇒ Object
Returns a Thread-specific mapping, unique to this process id.
-
#close ⇒ Object
Invokes @on_close.call(self, stream).
-
#connect! ⇒ Object
Invokes @on_connect.call(self).
-
#handle_error!(exc, action, stream) ⇒ Object
Dispatches exception and arguments if @on_error is defined.
-
#initialize(opts = nil) ⇒ Channel
constructor
A new instance of Channel.
-
#method_missing(sel, *args, &blk) ⇒ Object
Delegate to actual stream.
-
#stream ⇒ Object
Returns IO stream for current Thread.
-
#stream=(x) ⇒ Object
Sets the stream for this Channel, or nil.
-
#with_stream! ⇒ Object
Yield #stream to block.
Methods included from RetryBehavior
Constructor Details
#initialize(opts = nil) ⇒ Channel
Returns a new instance of Channel.
23 24 25 26 27 28 29 30 31 32 33 |
# File 'lib/asir/channel.rb', line 23 def initialize opts = nil @mutex = Mutex.new @on_close = ON_CLOSE @on_error = ON_ERROR # @on_retry = ON_RETRY self.try_max = 10 self.try_sleep = 0.1 self.try_sleep_increment = 0.1 self.try_sleep_max = 10 super end |
Dynamic Method Handling
This class handles dynamic methods through the method_missing method
#method_missing(sel, *args, &blk) ⇒ Object
Delegate to actual stream.
94 95 96 97 98 |
# File 'lib/asir/channel.rb', line 94 def method_missing sel, *args, &blk with_stream! do | obj | obj.__send__(sel, *args, &blk) end end |
Instance Attribute Details
#on_close ⇒ Object
Returns the value of attribute on_close.
11 12 13 |
# File 'lib/asir/channel.rb', line 11 def on_close @on_close end |
#on_connect ⇒ Object
Returns the value of attribute on_connect.
11 12 13 |
# File 'lib/asir/channel.rb', line 11 def on_connect @on_connect end |
#on_error ⇒ Object
Returns the value of attribute on_error.
11 12 13 |
# File 'lib/asir/channel.rb', line 11 def on_error @on_error end |
#on_retry ⇒ Object
Returns the value of attribute on_retry.
11 12 13 |
# File 'lib/asir/channel.rb', line 11 def on_retry @on_retry end |
Instance Method Details
#_stream ⇒ Object
Returns the stream for this Channel, or nil.
125 126 127 |
# File 'lib/asir/channel.rb', line 125 def _stream _streams[self] end |
#_streams ⇒ Object
Returns a Thread-specific mapping, unique to this process id. Maps from Channel objects to actual stream.
112 113 114 115 116 117 118 119 120 121 122 |
# File 'lib/asir/channel.rb', line 112 def _streams @mutex.synchronize do streams = Thread.current[:'ASIR::Channel._streams'] ||= { } if @owning_process != $$ || # child process? @owning_process > $$ # PIDs wrapped around? @owning_process = $$ streams.clear end streams end end |
#close ⇒ Object
Invokes @on_close.call(self, stream). On Exception, invokes @on_error.call(self, exc, :close, stream).
69 70 71 72 73 74 75 76 77 78 |
# File 'lib/asir/channel.rb', line 69 def close if stream = _stream self.stream = nil @on_close.call(self, stream) if @on_close end rescue *Error::Unrecoverable.modules raise rescue ::Exception => exc handle_error!(exc, :close, stream) end |
#connect! ⇒ Object
Invokes @on_connect.call(self). On Exception, invokes @on_error.call(self, exc, :connect, nil).
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/asir/channel.rb', line 44 def connect! n_try = nil with_retry do | action, data | case action when :try n_try = data @on_connect.call(self) when :retry #, exc exc = data case exc when *Error::Unrecoverable.modules raise exc end $stderr.puts "RETRY: #{n_try}: ERROR : #{data.inspect}" @on_retry.call(self, exc, :connect) if @on_retry when :failed exc = data $stderr.puts "FAILED: #{n_try}: ERROR : #{data.inspect}" handle_error!(exc, :connect, nil) end end end |
#handle_error!(exc, action, stream) ⇒ Object
Dispatches exception and arguments if @on_error is defined. Otherwise, reraise exception.
102 103 104 105 106 107 108 |
# File 'lib/asir/channel.rb', line 102 def handle_error! exc, action, stream if @on_error @on_error.call(self, exc, action, stream) else raise exc end end |
#stream ⇒ Object
Returns IO stream for current Thread. Automatically calls #connect! if stream is created.
37 38 39 40 |
# File 'lib/asir/channel.rb', line 37 def stream _streams[self] ||= connect! end |
#stream=(x) ⇒ Object
Sets the stream for this Channel, or nil.
130 131 132 133 134 135 136 |
# File 'lib/asir/channel.rb', line 130 def stream= x if x == nil _streams.delete(self) else _streams[self] = x end end |
#with_stream! ⇒ Object
Yield #stream to block. On Exception, invokes @on_error.call(self, exc, :with_stream, stream).
82 83 84 85 86 87 88 89 90 91 |
# File 'lib/asir/channel.rb', line 82 def with_stream! x = stream begin yield x rescue *Error::Unrecoverable.modules raise rescue ::Exception => exc handle_error!(exc, :with_stream, x) end end |