Class: Fluent::HttpInput

Inherits:
Input
  • Object
show all
Includes:
DetachMultiProcessMixin
Defined in:
lib/fluent/plugin/in_http.rb

Defined Under Namespace

Classes: Handler, KeepaliveManager

Instance Attribute Summary

Attributes included from Configurable

#config

Instance Method Summary collapse

Methods included from DetachProcessImpl

#on_detach_process

Methods included from PluginId

#plugin_id, #require_id

Methods included from Configurable

included

Constructor Details

#initializeHttpInput

Returns a new instance of HttpInput.



28
29
30
31
# File 'lib/fluent/plugin/in_http.rb', line 28

def initialize
  require 'webrick/httputils'
  super
end

Instance Method Details

#configure(conf) ⇒ Object



38
39
40
# File 'lib/fluent/plugin/in_http.rb', line 38

def configure(conf)
  super
end

#on_request(path_info, params) ⇒ Object



105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/fluent/plugin/in_http.rb', line 105

def on_request(path_info, params)
  begin
    path = path_info[1..-1]  # remove /
    tag = path.split('/').join('.')

    if msgpack = params['msgpack']
      record = MessagePack.unpack(msgpack)

    elsif js = params['json']
      record = JSON.parse(js)

    else
      raise "'json' or 'msgpack' parameter is required"
    end

    time = params['time']
    time = time.to_i
    if time == 0
      time = Engine.now
    end

  rescue
    return ["400 Bad Request", {'Content-type'=>'text/plain'}, "400 Bad Request\n#{$!}\n"]
  end

  # TODO server error
  begin
    Engine.emit(tag, time, record)
  rescue
    return ["500 Internal Server Error", {'Content-type'=>'text/plain'}, "500 Internal Server Error\n#{$!}\n"]
  end

  return ["200 OK", {'Content-type'=>'text/plain'}, ""]
end

#runObject



98
99
100
101
102
103
# File 'lib/fluent/plugin/in_http.rb', line 98

def run
  @loop.run
rescue
  $log.error "unexpected error", :error=>$!.to_s
  $log.error_backtrace
end

#shutdownObject



91
92
93
94
95
96
# File 'lib/fluent/plugin/in_http.rb', line 91

def shutdown
  @loop.watchers.each {|w| w.detach }
  @loop.stop
  @lsock.close
  @thread.join
end

#startObject



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/fluent/plugin/in_http.rb', line 73

def start
  $log.debug "listening http on #{@bind}:#{@port}"
  lsock = TCPServer.new(@bind, @port)

  detach_multi_process do
    super
    @km = KeepaliveManager.new(@keepalive_timeout)
    #@lsock = Coolio::TCPServer.new(@bind, @port, Handler, @km, method(:on_request), @body_size_limit)
    @lsock = Coolio::TCPServer.new(lsock, nil, Handler, @km, method(:on_request), @body_size_limit)

    @loop = Coolio::Loop.new
    @loop.attach(@km)
    @loop.attach(@lsock)

    @thread = Thread.new(&method(:run))
  end
end