Class: Fluent::HoopOutput
- Inherits:
-
TimeSlicedOutput
- Object
- TimeSlicedOutput
- Fluent::HoopOutput
- Includes:
- Mixin::PlainTextFormatter
- Defined in:
- lib/fluent/plugin/out_hoop.rb
Instance Method Summary collapse
- #configure(conf) ⇒ Object
-
#initialize ⇒ HoopOutput
constructor
A new instance of HoopOutput.
-
#path_format(chunk_key) ⇒ Object
def format(tag, time, record) end.
- #record_to_string(record) ⇒ Object
- #send_data(path, data, retries = 0) ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
- #write(chunk) ⇒ Object
Constructor Details
#initialize ⇒ HoopOutput
Returns a new instance of HoopOutput.
15 16 17 18 19 |
# File 'lib/fluent/plugin/out_hoop.rb', line 15 def initialize super require 'net/http' require 'time' end |
Instance Method Details
#configure(conf) ⇒ Object
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/fluent/plugin/out_hoop.rb', line 21 def configure(conf) if conf['path'] if conf['path'].index('%S') conf['time_slice_format'] = '%Y%m%d%H%M%S' elsif conf['path'].index('%M') conf['time_slice_format'] = '%Y%m%d%H%M' elsif conf['path'].index('%H') conf['time_slice_format'] = '%Y%m%d%H' end end super unless /\A([a-zA-Z0-9][-a-zA-Z0-9.]*):(\d+)\Z/ =~ @hoop_server raise Fluent::ConfigError, "Invalid config value on hoop_server: '#{@hoop_server}', needs SERVER_NAME:PORT" end @host = $1 @port = $2.to_i unless @path.index('/') == 0 raise Fluent::ConfigError, "Path on hdfs MUST starts with '/', but '#{@path}'" end @conn = nil @header = {'Content-Type' => 'application/octet-stream'} end |
#path_format(chunk_key) ⇒ Object
def format(tag, time, record) end
78 79 80 |
# File 'lib/fluent/plugin/out_hoop.rb', line 78 def path_format(chunk_key) Time.strptime(chunk_key, @time_slice_format).strftime(@path) end |
#record_to_string(record) ⇒ Object
71 72 73 |
# File 'lib/fluent/plugin/out_hoop.rb', line 71 def record_to_string(record) record.to_json end |
#send_data(path, data, retries = 0) ⇒ Object
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/fluent/plugin/out_hoop.rb', line 82 def send_data(path, data, retries=0) conn = Net::HTTP.start(@host, @port) conn.read_timeout = 5 res = conn.request_put(path + "?op=append", data, @authorized_header) if res.code == '401' res = conn.request_get("/?op=status&user.name=#{@username}") if res.code.to_i < 300 and res['Set-Cookie'] @authorized_header = {'Cookie' => res['Set-Cookie'].split(';')[0], 'Content-Type' => 'application/octet-stream'} else $log.error "Failed to update authorized cookie, code: #{res.code}, message: #{res.body}" raise Fluent::ConfigError, "Failed to update authorized cookie, code: #{res.code}, message: #{res.body}" end res = conn.request_put(path + "?op=append", data, @authorized_header) end if res.code == '404' res = conn.request_post(path + "?op=create&overwrite=false", data, @authorized_header) end if res.code == '500' if retries >= 3 raise StandardError, "failed to send_data with retry 3 times InternalServerError" end sleep 0.3 # yes, this is a magic number res = send_data(path, data, retries + 1) end conn.finish if res.code != '200' and res.code != '201' $log.warn "failed to write data to path: #{path}, code: #{res.code} #{res.}" end res end |
#shutdown ⇒ Object
67 68 69 |
# File 'lib/fluent/plugin/out_hoop.rb', line 67 def shutdown super end |
#start ⇒ Object
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/fluent/plugin/out_hoop.rb', line 46 def start super # okey, net/http has reconnect feature. see test_out_hoop_reconnect.rb conn = Net::HTTP.start(@host, @port) begin res = conn.request_get("/?op=status&user.name=#{@username}") if res.code.to_i < 300 and res['Set-Cookie'] @authorized_header = {'Cookie' => res['Set-Cookie'].split(';')[0], 'Content-Type' => 'application/octet-stream'} else $log.error "initalize request failed, code: #{res.code}, message: #{res.body}" raise Fluent::ConfigError, "initalize request failed, code: #{res.code}, message: #{res.body}" end rescue $log.error "failed to connect hoop server: #{@host} port #{@port}" raise end conn.finish $log.info "connected hoop server: #{@host} port #{@port}" end |
#write(chunk) ⇒ Object
113 114 115 116 117 118 119 120 121 122 |
# File 'lib/fluent/plugin/out_hoop.rb', line 113 def write(chunk) hdfs_path = path_format(chunk.key) begin send_data(hdfs_path, chunk.read) rescue $log.error "failed to communicate server, #{@host} port #{@port}, path: #{hdfs_path}" raise end hdfs_path end |