Class: RedisHA::Router::Connection
- Inherits:
-
EventMachine::Connection
- Object
- EventMachine::Connection
- RedisHA::Router::Connection
- Includes:
- Logger
- Defined in:
- lib/redis_ha/router/connection.rb
Instance Method Summary collapse
- #connected(name) ⇒ Object
-
#initialize(debug_on = false) ⇒ Connection
constructor
A new instance of Connection.
- #on_connect(&block) ⇒ Object
- #on_data(&block) ⇒ Object
- #on_finish(&block) ⇒ Object
- #on_response(&block) ⇒ Object
- #peer ⇒ Object
- #receive_data(data) ⇒ Object
- #relay_from_upstream(name, data) ⇒ Object
- #relay_to_upstreams(processed) ⇒ Object
- #unbind ⇒ Object
- #unbind_backend(name) ⇒ Object
- #upstream(name, options) ⇒ Object
Constructor Details
#initialize(debug_on = false) ⇒ Connection
Returns a new instance of Connection.
11 12 13 14 |
# File 'lib/redis_ha/router/connection.rb', line 11 def initialize(debug_on=false) @debug = debug_on @upstreams = {} end |
Instance Method Details
#connected(name) ⇒ Object
63 64 65 66 |
# File 'lib/redis_ha/router/connection.rb', line 63 def connected name logger.debug [:connected] @on_connect.call(name) if @on_connect end |
#on_connect(&block) ⇒ Object
9 |
# File 'lib/redis_ha/router/connection.rb', line 9 def on_connect(&block); @on_connect = block; end |
#on_data(&block) ⇒ Object
6 |
# File 'lib/redis_ha/router/connection.rb', line 6 def on_data(&block); @on_data = block; end |
#on_finish(&block) ⇒ Object
8 |
# File 'lib/redis_ha/router/connection.rb', line 8 def on_finish(&block); @on_finish = block; end |
#on_response(&block) ⇒ Object
7 |
# File 'lib/redis_ha/router/connection.rb', line 7 def on_response(&block); @on_response = block; end |
#peer ⇒ Object
50 51 52 53 54 55 |
# File 'lib/redis_ha/router/connection.rb', line 50 def peer @peer ||= begin peername = get_peername peername ? Socket.unpack_sockaddr_in(peername).reverse : nil end end |
#receive_data(data) ⇒ Object
16 17 18 19 20 21 22 |
# File 'lib/redis_ha/router/connection.rb', line 16 def receive_data data logger.debug [:connection, data] processed = @on_data.call(data) if @on_data return if processed == :async or processed.nil? relay_to_upstreams(processed) end |
#relay_from_upstream(name, data) ⇒ Object
57 58 59 60 61 |
# File 'lib/redis_ha/router/connection.rb', line 57 def relay_from_upstream name, data logger.debug [:relay_from_upsteam, name, data] data = @on_response.call(name, data) if @on_response send_data data unless data.nil? end |
#relay_to_upstreams(processed) ⇒ Object
24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/redis_ha/router/connection.rb', line 24 def relay_to_upstreams processed if processed.is_a? Array data, servers = *processed servers = servers.collect {|s| @upstreams[s]}.compact else data = processed servers ||= @upstreams.values.compact end servers.each do |s| s.send_data data unless data.nil? end end |
#unbind ⇒ Object
68 69 70 71 72 73 74 |
# File 'lib/redis_ha/router/connection.rb', line 68 def unbind logger.debug [:unbind, :connection] @upstreams.values.compact.each do |serv| serv.close_connection end end |
#unbind_backend(name) ⇒ Object
76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/redis_ha/router/connection.rb', line 76 def unbind_backend name logger.debug [:unbind_backend, name] @upstreams[name] = nil close = :close if @on_finish close = @on_finish.call(name) end if (@upstreams.values.compact.size.zero? && close != :keep) || close == :close close_connection_after_writing end end |
#upstream(name, options) ⇒ Object
38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/redis_ha/router/connection.rb', line 38 def upstream name, serv = EventMachine::bind_connect(*build_upstream_signature()) do |c| c.name = name c.plexer = self c.proxy_incoming_to(self, 10240) if [:relay_server] end self.proxy_incoming_to(serv, 10240) if [:relay_client] @upstreams[name] = serv end |