Class: RabbitCage::ClientConnection

Inherits:
EventMachine::Connection
  • Object
show all
Defined in:
lib/rabbitcage/client_connection.rb

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.start(host, port) ⇒ Object



3
4
5
6
7
8
# File 'lib/rabbitcage/client_connection.rb', line 3

def self.start(host, port)
  $server = EM.start_server(host, port, self)
  LOGGER.info "Listening on #{host}:#{port}"
  LOGGER.info "Send QUIT to quit after waiting for all connections to finish."
  LOGGER.info "Send TERM or INT to quit after waiting for up to 10 seconds for connections to finish."
end

Instance Method Details

#generate_log_line(payload, command) ⇒ Object



91
92
93
# File 'lib/rabbitcage/client_connection.rb', line 91

def generate_log_line(payload, command)
  "#{peer} #{command} #{payload.class.to_s[16..-1]}\tuser:#{@user} vh:#{@vhost} q:#{payload.respond_to?(:queue) ? payload.queue : 'nil'} ex:#{payload.respond_to?(:exchange) ? payload.exchange : 'nil'}"
end

#handle_data(data) ⇒ Object



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/rabbitcage/client_connection.rb', line 33

def handle_data(data)
  @timer.cancel if @timer
  data2 = data.dup

  while frame = AMQP::Frame.parse(data2)
    LOGGER.debug "Got frame: " + frame.payload.inspect
    case frame.payload
    when AMQP::Protocol::Connection::Open
      @vhost = frame.payload.virtual_host
    when AMQP::Protocol::Connection::StartOk
      length = frame.payload.response[10].unpack('c').first
      @user =  frame.payload.response[11..10+length]
    end

    command = self.filter(frame.payload)
    if command == :deny
      LOGGER.warn generate_log_line(frame.payload, command) if frame.payload.class != AMQP::Protocol::Basic::Get
      resp = AMQP::Protocol::Channel::Close.new(:reply_code => 403,
                                        :reply_text => "ACCESS_REFUSED - access to '#{frame.payload.queue || frame.payload.exchange rescue 'the server'}' in vhost '#{@vhost}' refused for user '#{@user}' by rabbitcage",
                                        :method_id => 10,
                                        :class_id => 50,
                                       ).to_frame
      resp.channel = frame.channel
      self.send_data resp.to_s
      return
    else
      LOGGER.info generate_log_line(frame.payload, command) if frame.payload.class != AMQP::Protocol::Basic::Get
    end
  end
  if @server_side || try_server_connect(RabbitCage.rabbit_host, RabbitCage.rabbit_port)
    @server_side.send_data data
  end
end

#peerObject



17
18
19
20
21
22
23
# File 'lib/rabbitcage/client_connection.rb', line 17

def peer
  @peer ||=
  begin
    port, ip = Socket.unpack_sockaddr_in(get_peername)
    "#{ip}:#{port}"
  end
end

#post_initObject



10
11
12
13
14
15
# File 'lib/rabbitcage/client_connection.rb', line 10

def post_init
  LOGGER.info "Accepted #{peer}"
  @buffer = []
  @tries = 0
  RabbitCage.incr
end

#receive_data(data) ⇒ Object



25
26
27
28
29
30
31
# File 'lib/rabbitcage/client_connection.rb', line 25

def receive_data(data)
    handle_data data
rescue => e
  close_connection
  LOGGER.error "#{e.class} - #{e.message}"
  LOGGER.debug "\n#{e.backtrace.join("\n")}"
end

#try_server_connect(host, port) ⇒ Object



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/rabbitcage/client_connection.rb', line 67

def try_server_connect(host, port)
  @server_side = ServerConnection.request(host, port, self)
  LOGGER.info "Successful connection to #{host}:#{port}."
  true
rescue => e
  @server_side = nil
  if @tries < 10
    @tries += 1
    LOGGER.error "Failed on server connect attempt #{@tries}. Trying again..."
    @timer.cancel if @timer
    @timer = EventMachine::Timer.new(0.1) do
      self.handle_data
    end
  else
    LOGGER.error "Failed after ten connection attempts."
  end
  false
end

#unbindObject



86
87
88
89
# File 'lib/rabbitcage/client_connection.rb', line 86

def unbind
  @server_side.close_connection_after_writing if @server_side
  RabbitCage.decr
end