Class: Fluent::Plugin::TcpSocketOutput
- Inherits:
-
Output
- Object
- Output
- Fluent::Plugin::TcpSocketOutput
- Defined in:
- lib/fluent/plugin/out_tcp_socket.rb
Constant Summary collapse
- @@socket =
nil
- @@execution_thread =
nil
- @@queue =
Queue.new
Instance Method Summary collapse
- #get_client ⇒ Object
- #process(tag, es) ⇒ Object
- #start ⇒ Object
- #start_service_thread ⇒ Object
- #stop ⇒ Object
Instance Method Details
#get_client ⇒ Object
62 63 64 65 66 67 68 |
# File 'lib/fluent/plugin/out_tcp_socket.rb', line 62 def get_client if @@socket == nil @@socket = TCPSocket.open(hostname, port) log.info "Socket established for %s:%s" % [hostname, port] end return @@socket end |
#process(tag, es) ⇒ Object
43 44 45 46 47 |
# File 'lib/fluent/plugin/out_tcp_socket.rb', line 43 def process(tag, es) es.each do |time, record| @@queue.enq("%s\r\n" % [record]) end end |
#start ⇒ Object
28 29 30 31 32 |
# File 'lib/fluent/plugin/out_tcp_socket.rb', line 28 def start super log.info "TCP Plugin started with %s:%s" % [hostname, port] @@execution_thread = Thread.new { start_service_thread } end |
#start_service_thread ⇒ Object
49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/fluent/plugin/out_tcp_socket.rb', line 49 def start_service_thread log.info "TCP Plugin thread started for %s:%s" % [hostname, port] while record = @@queue.deq begin get_client.puts record rescue log.error "client error %s" %[$!] @@socket = nil @@queue.enq record end end end |
#stop ⇒ Object
34 35 36 37 38 39 40 41 |
# File 'lib/fluent/plugin/out_tcp_socket.rb', line 34 def stop if @@execution_thread != nil Thread.kill(@@execution_thread) log.info "TCP Plugin thread has been killed" end @@queue.close log.info "Queue is closed" end |