Class: Fluent::HttpInput
Defined Under Namespace
Classes: Handler, KeepaliveManager
Instance Attribute Summary
Attributes included from Configurable
#config
Instance Method Summary
collapse
#on_detach_process
Methods included from PluginId
#plugin_id, #require_id
included
Constructor Details
Returns a new instance of HttpInput.
28
29
30
31
|
# File 'lib/fluent/plugin/in_http.rb', line 28
def initialize
require 'webrick/httputils'
super
end
|
Instance Method Details
38
39
40
|
# File 'lib/fluent/plugin/in_http.rb', line 38
def configure(conf)
super
end
|
#on_request(path_info, params) ⇒ Object
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
|
# File 'lib/fluent/plugin/in_http.rb', line 105
def on_request(path_info, params)
begin
path = path_info[1..-1] tag = path.split('/').join('.')
if msgpack = params['msgpack']
record = MessagePack.unpack(msgpack)
elsif js = params['json']
record = JSON.parse(js)
else
raise "'json' or 'msgpack' parameter is required"
end
time = params['time']
time = time.to_i
if time == 0
time = Engine.now
end
rescue
return ["400 Bad Request", {'Content-type'=>'text/plain'}, "400 Bad Request\n#{$!}\n"]
end
begin
Engine.emit(tag, time, record)
rescue
return ["500 Internal Server Error", {'Content-type'=>'text/plain'}, "500 Internal Server Error\n#{$!}\n"]
end
return ["200 OK", {'Content-type'=>'text/plain'}, ""]
end
|
#run ⇒ Object
98
99
100
101
102
103
|
# File 'lib/fluent/plugin/in_http.rb', line 98
def run
@loop.run
rescue
$log.error "unexpected error", :error=>$!.to_s
$log.error_backtrace
end
|
#shutdown ⇒ Object
91
92
93
94
95
96
|
# File 'lib/fluent/plugin/in_http.rb', line 91
def shutdown
@loop.watchers.each {|w| w.detach }
@loop.stop
@lsock.close
@thread.join
end
|
#start ⇒ Object
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
|
# File 'lib/fluent/plugin/in_http.rb', line 73
def start
$log.debug "listening http on #{@bind}:#{@port}"
lsock = TCPServer.new(@bind, @port)
detach_multi_process do
super
@km = KeepaliveManager.new(@keepalive_timeout)
@lsock = Coolio::TCPServer.new(lsock, nil, Handler, @km, method(:on_request), @body_size_limit)
@loop = Coolio::Loop.new
@loop.attach(@km)
@loop.attach(@lsock)
@thread = Thread.new(&method(:run))
end
end
|