Class: Fluent::Plugin::TcpInput

Inherits:
Input show all
Defined in:
lib/fluent/plugin/in_tcp.rb

Constant Summary

Constants included from Configurable

Configurable::CONFIG_TYPE_REGISTRY

Instance Attribute Summary

Attributes included from Fluent::PluginLoggerMixin

#log

Attributes inherited from Base

#under_plugin_development

Instance Method Summary collapse

Methods inherited from Input

#emit_records, #emit_size, #initialize, #metric_callback, #statistics

Methods included from Fluent::PluginHelper::Mixin

included

Methods included from Fluent::PluginLoggerMixin

included, #initialize, #terminate

Methods included from Fluent::PluginId

#initialize, #plugin_id, #plugin_id_configured?, #plugin_id_for_test?, #plugin_root_dir, #stop

Methods inherited from Base

#acquire_worker_lock, #after_shutdown, #after_shutdown?, #after_start, #after_started?, #before_shutdown, #before_shutdown?, #called_in_test?, #close, #closed?, #configured?, #context_router, #context_router=, #fluentd_worker_id, #get_lock_path, #has_router?, #initialize, #inspect, #plugin_root_dir, #reloadable_plugin?, #shutdown, #shutdown?, #started?, #stop, #stopped?, #string_safe_encoding, #terminate, #terminated?

Methods included from SystemConfig::Mixin

#system_config, #system_config_override

Methods included from Configurable

#config, #configure_proxy_generate, #configured_section_create, included, #initialize, lookup_type, register_type

Constructor Details

This class inherits a constructor from Fluent::Plugin::Input

Instance Method Details

#configure(conf) ⇒ Object



54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/fluent/plugin/in_tcp.rb', line 54

def configure(conf)
  compat_parameters_convert(conf, :parser)
  parser_config = conf.elements('parse').first
  unless parser_config
    raise Fluent::ConfigError, "<parse> section is required."
  end
  super
  @_event_loop_blocking_timeout = @blocking_timeout
  @source_hostname_key ||= @source_host_key if @source_host_key

  @nodes = nil
  if @security
    @nodes = []
    @security.clients.each do |client|
      if client.host && client.network
        raise Fluent::ConfigError, "both of 'host' and 'network' are specified for client"
      end
      if !client.host && !client.network
        raise Fluent::ConfigError, "Either of 'host' and 'network' must be specified for client"
      end
      source = nil
      if client.host
        begin
          source = IPSocket.getaddress(client.host)
        rescue SocketError
          raise Fluent::ConfigError, "host '#{client.host}' cannot be resolved"
        end
      end
      source_addr = begin
                      IPAddr.new(source || client.network)
                    rescue ArgumentError
                      raise Fluent::ConfigError, "network '#{client.network}' address format is invalid"
                    end
      @nodes.push(source_addr)
    end
  end

  @parser = parser_create(conf: parser_config)
end

#multi_workers_ready?Boolean

Returns:

  • (Boolean)


94
95
96
# File 'lib/fluent/plugin/in_tcp.rb', line 94

def multi_workers_ready?
  true
end

#startObject



98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
# File 'lib/fluent/plugin/in_tcp.rb', line 98

def start
  super

  log.info "listening tcp socket", bind: @bind, port: @port
  del_size = @delimiter.length
  if @_extract_enabled && @_extract_tag_key
    server_create(:in_tcp_server_single_emit, @port, bind: @bind, resolve_name: !!@source_hostname_key) do |data, conn|
      unless check_client(conn)
        conn.close
        next
      end

      conn.buffer << data
      buf = conn.buffer
      pos = 0
      while i = buf.index(@delimiter, pos)
        msg = buf[pos...i]
        pos = i + del_size

        @parser.parse(msg) do |time, record|
          unless time && record
            log.warn "pattern not matched", message: msg
            next
          end

          tag = extract_tag_from_record(record)
          tag ||= @tag
          time ||= extract_time_from_record(record) || Fluent::EventTime.now
          record[@source_address_key] = conn.remote_addr if @source_address_key
          record[@source_hostname_key] = conn.remote_host if @source_hostname_key
          router.emit(tag, time, record)
        end
      end
      buf.slice!(0, pos) if pos > 0
    end
  else
    server_create(:in_tcp_server_batch_emit, @port, bind: @bind, resolve_name: !!@source_hostname_key) do |data, conn|
      unless check_client(conn)
        conn.close
        next
      end

      conn.buffer << data
      buf = conn.buffer
      pos = 0
      es = Fluent::MultiEventStream.new
      while i = buf.index(@delimiter, pos)
        msg = buf[pos...i]
        pos = i + del_size

        @parser.parse(msg) do |time, record|
          unless time && record
            log.warn "pattern not matched", message: msg
            next
          end

          time ||= extract_time_from_record(record) || Fluent::EventTime.now
          record[@source_address_key] = conn.remote_addr if @source_address_key
          record[@source_hostname_key] = conn.remote_host if @source_hostname_key
          es.add(time, record)
        end
      end
      router.emit_stream(@tag, es)
      buf.slice!(0, pos) if pos > 0
    end
  end
end