Class: Fluent::HBaseOutput

Inherits:
BufferedOutput
  • Object
show all
Includes:
SetTagKeyMixin, SetTimeKeyMixin
Defined in:
lib/fluent/plugin/out_hbase.rb

Instance Method Summary collapse

Constructor Details

#initializeHBaseOutput

Returns a new instance of HBaseOutput.



6
7
8
9
# File 'lib/fluent/plugin/out_hbase.rb', line 6

def initialize
  super
  require 'massive_record'
end

Instance Method Details

#configure(conf) ⇒ Object



30
31
32
33
34
35
36
37
# File 'lib/fluent/plugin/out_hbase.rb', line 30

def configure(conf)
  super

  @fields_to_columns = @fields_to_columns_mapping.split(",").map { |src_to_dst|
    src_to_dst.split("=>")
  }
  @mapping = Hash[*@fields_to_columns.flatten]
end

#format(tag, time, record) ⇒ Object



68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/fluent/plugin/out_hbase.rb', line 68

def format(tag, time, record)
  row_values = {}

  row_values[@tag_column_name] = tag unless @tag_column_name.nil?
  row_values[@time_column_name] = time unless @time_column_name.nil?

  @fields_to_columns.each {|field,column|

    next if field.nil? or column.nil?

    components = field.split(".")
    value = record
    for c in components
      value = value[c]

      break if value.nil?
    end

    row_values[column] = value
  }

  row_values.to_msgpack
end

#startObject



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/fluent/plugin/out_hbase.rb', line 39

def start
  super

  @conn = MassiveRecord::Wrapper::Connection.new(:host => @hbase_host, :port => @hbase_port)
  @table = MassiveRecord::Wrapper::Table.new(@conn, @hbase_table.intern)

  unless @table.exists?
    columns = ([@tag_column_name, @time_column_name] + @mapping.values).reject(&:nil?)
    column_families = columns.map {|column_family_with_column|
      column_family, column = column_family_with_column.split(":")

      if column.nil? or column_family.nil?
        raise <<MESSAGE
Unexpected format for column name: #{column_family_with_column}
Each destination column in the 'record_to_columns_mapping' option
must be specified in the format of \"column_family:column\".
Are you sure you included ':' in column names?
MESSAGE
      end

      column_family.intern
    }
    column_families.uniq!

    @table.create_column_families(column_families)
    @table.save
  end
end

#write(chunk) ⇒ Object



92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/fluent/plugin/out_hbase.rb', line 92

def write(chunk)
  chunk.msgpack_each {|row_values|
    event = {}

    row_values.each {|column_family_and_column, value|
      column_family, column = column_family_and_column.split(":")

      (event[column_family.intern] ||= {}).update({column => value})
    }

    row = MassiveRecord::Wrapper::Row.new
    row.id = SecureRandom.uuid
    row.values = event
    row.table = @table
    row.save
  }
end