Class: Fluent::Plugin::HttpForwardOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_http_forward.rb

Instance Method Summary collapse

Constructor Details

#initializeNilClass

Initialize new output plugin

Since:

  • 0.1.0



38
39
40
41
# File 'lib/fluent/plugin/out_http_forward.rb', line 38

def initialize
  super
  require 'http'
end

Instance Method Details

#configure(config) ⇒ NilClass

Initialize attributes and parameters

Returns:

  • (NilClass)

Since:

  • 0.1.0



46
47
48
49
50
# File 'lib/fluent/plugin/out_http_forward.rb', line 46

def configure(config)
  super

  configure_params(config)
end

#configure_params(config) ⇒ NilClass

Configure plugin parameters

Returns:

  • (NilClass)

Since:

  • 0.1.0



55
56
57
58
59
60
61
62
63
# File 'lib/fluent/plugin/out_http_forward.rb', line 55

def configure_params(config)
  unless config.key?("content_type")
    @content_type = case @format['@type']
      when 'json' then 'application/json'
      when 'msgpack' then 'application/x-msgpack'
      else nil
      end
  end
end

#execute_chunking(tag, es, enqueue: false) ⇒ NilClass

Enforce the usage of the MsgPack streaming method internally so as to ensure that we have a consistent buffering mechanism.

Returns:

  • (NilClass)

Since:

  • 0.1.0



110
111
112
# File 'lib/fluent/plugin/out_http_forward.rb', line 110

def execute_chunking(tag, es, enqueue: false)
  return handle_stream_with_standard_format(tag, es, enqueue: enqueue)
end

#format_body(tag, time, chunk) ⇒ String

Format the Body for a given request depending on the Serializing format.

Returns:

  • (String)

    the request body

Since:

  • 0.1.0



133
134
135
136
137
138
139
140
141
142
143
# File 'lib/fluent/plugin/out_http_forward.rb', line 133

def format_body(tag, time, chunk)
  case @format["@type"]
  when "json"
    chunk.extend Fluent::PluginMixin::ChunkJsonStreamer
    serializer_proc = chunk.method(:to_json_stream)
  else
    serializer_proc = chunk.method(:to_msgpack_stream)
  end

  body = serializer_proc.call
end

#format_headers(tag, time, data) ⇒ Hash

Format the Headers for a given request.

Returns:

  • (Hash)

    the header hash

Since:

  • 0.1.0



125
126
127
# File 'lib/fluent/plugin/out_http_forward.rb', line 125

def format_headers(tag, time, data)
  @headers
end

#format_url(tag, time, data) ⇒ String

Format the URL for a given request, and optionally replace the tag placerholder in the URI.

Returns:

  • (String)

    the url

Since:

  • 0.1.0



118
119
120
# File 'lib/fluent/plugin/out_http_forward.rb', line 118

def format_url(tag, time, data)
  @url.sub(/%{tag}/, tag)
end

#shutdownNilClass

Tear down the plugin

Returns:

  • (NilClass)

Since:

  • 0.1.0



101
102
103
104
# File 'lib/fluent/plugin/out_http_forward.rb', line 101

def shutdown
  @connection.close
  super
end

#startNilClass

Prepare the plugin event loop

Returns:

  • (NilClass)

Since:

  • 0.1.0



68
69
70
71
# File 'lib/fluent/plugin/out_http_forward.rb', line 68

def start
  super
  start_connection
end

#start_connectionHTTP::Client

Prepare the HTTP client object which provides a baseline for future request objects.

Returns:

  • (HTTP::Client)

Since:

  • 0.1.0



77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/fluent/plugin/out_http_forward.rb', line 77

def start_connection
  @connection = HTTP::Client.new(
    :headers => @headers
  )

  if @authentication['method']
    @connection = @connection.basic_auth(
      :user => @authentication['username'],
      :pass => @authentication['password']
    )
  end
     
  if @content_type
    @connection = @connection.headers(
      'Content-Type' => @content_type
    )
  end

  @connection
end

#try_write(chunk) ⇒ Nil

Process a chunk of records asynchronously, committing successful uploads and re-queueing failures.

Returns:

  • (Nil)

Since:

  • 0.1.0



176
177
178
# File 'lib/fluent/plugin/out_http_forward.rb', line 176

def try_write(chunk)
  write(chunk)
end

#upload(tag, time, content) ⇒ Response

Upload the request body to the remote end point

Returns:

  • (Response)

    the HTTP response object

Since:

  • 0.1.0



148
149
150
151
152
153
# File 'lib/fluent/plugin/out_http_forward.rb', line 148

def upload(tag, time, content)
  headers = format_headers(tag, nil, content)
  url = format_url(tag, nil, content)

  @connection.request(@verb, url, body: content, headers: headers)
end

#write(chunk) ⇒ Nil

Process a chunk of records synchronously, committing successful uploads and re-queueing failures.

Returns:

  • (Nil)

Since:

  • 0.1.0



159
160
161
162
163
164
165
166
167
168
169
170
# File 'lib/fluent/plugin/out_http_forward.rb', line 159

def write(chunk)
  tag = chunk..tag
  content = format_body(tag, nil, chunk)

  response = upload(tag, nil, content)

  if response.code <= 299
    commit_write(chunk.unique_id)
  else
    @log.warn "failed to flush buffer", code: response.code
  end
end