Class: Fluent::Plugin::TcpSocketOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_tcp_socket.rb

Constant Summary collapse

@@socket =
nil
@@execution_thread =
nil
@@queue =
Queue.new

Instance Method Summary collapse

Instance Method Details

#get_clientObject



62
63
64
65
66
67
68
# File 'lib/fluent/plugin/out_tcp_socket.rb', line 62

def get_client
   if @@socket == nil
      @@socket = TCPSocket.open(hostname, port)
      log.info "Socket established for %s:%s" % [hostname, port]
   end
   return @@socket
end

#process(tag, es) ⇒ Object



43
44
45
46
47
# File 'lib/fluent/plugin/out_tcp_socket.rb', line 43

def process(tag, es)
   es.each do |time, record|
      @@queue.enq("%s\r\n" % [record])
   end
end

#startObject



28
29
30
31
32
# File 'lib/fluent/plugin/out_tcp_socket.rb', line 28

def start
   super
   log.info "TCP Plugin started with %s:%s" % [hostname, port]
   @@execution_thread = Thread.new { start_service_thread }
end

#start_service_threadObject



49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/fluent/plugin/out_tcp_socket.rb', line 49

def start_service_thread
   log.info "TCP Plugin thread started for %s:%s" % [hostname, port]
   while record = @@queue.deq
      begin
        get_client.puts record
      rescue
        log.error "client error %s" %[$!]
        @@socket = nil
        @@queue.enq record
      end
   end
end

#stopObject



34
35
36
37
38
39
40
41
# File 'lib/fluent/plugin/out_tcp_socket.rb', line 34

def stop
   if @@execution_thread != nil
      Thread.kill(@@execution_thread)
      log.info "TCP Plugin thread has been killed"
   end
   @@queue.close
   log.info "Queue is closed"
end