Class: Fluent::HoopOutput

Inherits:
TimeSlicedOutput
  • Object
show all
Includes:
Mixin::PlainTextFormatter
Defined in:
lib/fluent/plugin/out_hoop.rb

Instance Method Summary collapse

Constructor Details

#initializeHoopOutput

Returns a new instance of HoopOutput.



15
16
17
18
19
# File 'lib/fluent/plugin/out_hoop.rb', line 15

def initialize
  super
  require 'net/http'
  require 'time'
end

Instance Method Details

#configure(conf) ⇒ Object



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/fluent/plugin/out_hoop.rb', line 21

def configure(conf)
  if conf['path']
    if conf['path'].index('%S')
      conf['time_slice_format'] = '%Y%m%d%H%M%S'
    elsif conf['path'].index('%M')
      conf['time_slice_format'] = '%Y%m%d%H%M'
    elsif conf['path'].index('%H')
      conf['time_slice_format'] = '%Y%m%d%H'
    end
  end

  super

  unless /\A([a-zA-Z0-9][-a-zA-Z0-9.]*):(\d+)\Z/ =~ @hoop_server
    raise Fluent::ConfigError, "Invalid config value on hoop_server: '#{@hoop_server}', needs SERVER_NAME:PORT"
  end
  @host = $1
  @port = $2.to_i
  unless @path.index('/') == 0
    raise Fluent::ConfigError, "Path on hdfs MUST starts with '/', but '#{@path}'"
  end
  @conn = nil
  @header = {'Content-Type' => 'application/octet-stream'}
end

#path_format(chunk_key) ⇒ Object

def format(tag, time, record) end



78
79
80
# File 'lib/fluent/plugin/out_hoop.rb', line 78

def path_format(chunk_key)
  Time.strptime(chunk_key, @time_slice_format).strftime(@path)
end

#record_to_string(record) ⇒ Object



71
72
73
# File 'lib/fluent/plugin/out_hoop.rb', line 71

def record_to_string(record)
  record.to_json
end

#send_data(path, data, retries = 0) ⇒ Object



82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/fluent/plugin/out_hoop.rb', line 82

def send_data(path, data, retries=0)
  conn = Net::HTTP.start(@host, @port)
  conn.read_timeout = 5
  res = conn.request_put(path + "?op=append", data, @authorized_header)
  if res.code == '401'
    res = conn.request_get("/?op=status&user.name=#{@username}")
    if res.code.to_i < 300 and res['Set-Cookie']
      @authorized_header = {'Cookie' => res['Set-Cookie'].split(';')[0], 'Content-Type' => 'application/octet-stream'}
    else
      $log.error "Failed to update authorized cookie, code: #{res.code}, message: #{res.body}"
      raise Fluent::ConfigError, "Failed to update authorized cookie, code: #{res.code}, message: #{res.body}"
    end
    res = conn.request_put(path + "?op=append", data, @authorized_header)
  end
  if res.code == '404'
    res = conn.request_post(path + "?op=create&overwrite=false", data, @authorized_header)
  end
  if res.code == '500'
    if retries >= 3
      raise StandardError, "failed to send_data with retry 3 times InternalServerError"
    end
    sleep 0.3 # yes, this is a magic number
    res = send_data(path, data, retries + 1)
  end
  conn.finish
  if res.code != '200' and res.code != '201'
    $log.warn "failed to write data to path: #{path}, code: #{res.code} #{res.message}"
  end
  res
end

#shutdownObject



67
68
69
# File 'lib/fluent/plugin/out_hoop.rb', line 67

def shutdown
  super
end

#startObject



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/fluent/plugin/out_hoop.rb', line 46

def start
  super

  # okey, net/http has reconnect feature. see test_out_hoop_reconnect.rb
  conn = Net::HTTP.start(@host, @port)
  begin
    res = conn.request_get("/?op=status&user.name=#{@username}")
    if res.code.to_i < 300 and res['Set-Cookie']
      @authorized_header = {'Cookie' => res['Set-Cookie'].split(';')[0], 'Content-Type' => 'application/octet-stream'}
    else
      $log.error "initalize request failed, code: #{res.code}, message: #{res.body}"
      raise Fluent::ConfigError, "initalize request failed, code: #{res.code}, message: #{res.body}"
    end
  rescue
    $log.error "failed to connect hoop server: #{@host} port #{@port}"
    raise
  end
  conn.finish
  $log.info "connected hoop server: #{@host} port #{@port}"
end

#write(chunk) ⇒ Object



113
114
115
116
117
118
119
120
121
122
# File 'lib/fluent/plugin/out_hoop.rb', line 113

def write(chunk)
  hdfs_path = path_format(chunk.key)
  begin
    send_data(hdfs_path, chunk.read)
  rescue
    $log.error "failed to communicate server, #{@host} port #{@port}, path: #{hdfs_path}"
    raise
  end
  hdfs_path
end