Class: RedisHA::Router::Connection

Inherits:
EventMachine::Connection
  • Object
show all
Includes:
Logger
Defined in:
lib/redis_ha/router/connection.rb

Instance Method Summary collapse

Constructor Details

#initialize(debug_on = false) ⇒ Connection

Returns a new instance of Connection.



11
12
13
14
# File 'lib/redis_ha/router/connection.rb', line 11

def initialize(debug_on=false)
  @debug = debug_on
  @upstreams = {}
end

Instance Method Details

#connected(name) ⇒ Object



63
64
65
66
# File 'lib/redis_ha/router/connection.rb', line 63

def connected name
  logger.debug [:connected]
  @on_connect.call(name) if @on_connect
end

#on_connect(&block) ⇒ Object



9
# File 'lib/redis_ha/router/connection.rb', line 9

def on_connect(&block);  @on_connect  = block; end

#on_data(&block) ⇒ Object



6
# File 'lib/redis_ha/router/connection.rb', line 6

def on_data(&block);     @on_data     = block; end

#on_finish(&block) ⇒ Object



8
# File 'lib/redis_ha/router/connection.rb', line 8

def on_finish(&block);   @on_finish   = block; end

#on_response(&block) ⇒ Object



7
# File 'lib/redis_ha/router/connection.rb', line 7

def on_response(&block); @on_response = block; end

#peerObject



50
51
52
53
54
55
# File 'lib/redis_ha/router/connection.rb', line 50

def peer
  @peer ||= begin
    peername = get_peername
    peername ? Socket.unpack_sockaddr_in(peername).reverse : nil
  end
end

#receive_data(data) ⇒ Object



16
17
18
19
20
21
22
# File 'lib/redis_ha/router/connection.rb', line 16

def receive_data data
  logger.debug [:connection, data]
  processed = @on_data.call(data) if @on_data

  return if processed == :async or processed.nil?
  relay_to_upstreams(processed)
end

#relay_from_upstream(name, data) ⇒ Object



57
58
59
60
61
# File 'lib/redis_ha/router/connection.rb', line 57

def relay_from_upstream name, data
  logger.debug [:relay_from_upsteam, name, data]
  data = @on_response.call(name, data) if @on_response
  send_data data unless data.nil?
end

#relay_to_upstreams(processed) ⇒ Object



24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/redis_ha/router/connection.rb', line 24

def relay_to_upstreams processed
  if processed.is_a? Array
    data, servers = *processed
    servers = servers.collect {|s| @upstreams[s]}.compact
  else
    data = processed
    servers ||= @upstreams.values.compact
  end

  servers.each do |s|
    s.send_data data unless data.nil?
  end
end

#unbindObject



68
69
70
71
72
73
74
# File 'lib/redis_ha/router/connection.rb', line 68

def unbind
  logger.debug [:unbind, :connection]

  @upstreams.values.compact.each do |serv|
    serv.close_connection
  end
end

#unbind_backend(name) ⇒ Object



76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/redis_ha/router/connection.rb', line 76

def unbind_backend name
  logger.debug [:unbind_backend, name]
  @upstreams[name] = nil
  close = :close

  if @on_finish
    close = @on_finish.call(name)
  end

  if (@upstreams.values.compact.size.zero? && close != :keep) || close == :close
    close_connection_after_writing
  end
end

#upstream(name, options) ⇒ Object



38
39
40
41
42
43
44
45
46
47
48
# File 'lib/redis_ha/router/connection.rb', line 38

def upstream name, options
  serv = EventMachine::bind_connect(*build_upstream_signature(options)) do |c|
    c.name = name
    c.plexer = self
    c.proxy_incoming_to(self, 10240) if options[:relay_server]
  end

  self.proxy_incoming_to(serv, 10240) if options[:relay_client]

  @upstreams[name] = serv
end