Class: Fluent::HBaseOutput
- Inherits:
-
BufferedOutput
- Object
- BufferedOutput
- Fluent::HBaseOutput
- Includes:
- SetTagKeyMixin, SetTimeKeyMixin
- Defined in:
- lib/fluent/plugin/out_hbase.rb
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #format(tag, time, record) ⇒ Object
-
#initialize ⇒ HBaseOutput
constructor
A new instance of HBaseOutput.
- #start ⇒ Object
- #write(chunk) ⇒ Object
Constructor Details
#initialize ⇒ HBaseOutput
Returns a new instance of HBaseOutput.
6 7 8 9 |
# File 'lib/fluent/plugin/out_hbase.rb', line 6 def initialize super require 'massive_record' end |
Instance Method Details
#configure(conf) ⇒ Object
30 31 32 33 34 35 36 37 |
# File 'lib/fluent/plugin/out_hbase.rb', line 30 def configure(conf) super @fields_to_columns = @fields_to_columns_mapping.split(",").map { |src_to_dst| src_to_dst.split("=>") } @mapping = Hash[*@fields_to_columns.flatten] end |
#format(tag, time, record) ⇒ Object
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
# File 'lib/fluent/plugin/out_hbase.rb', line 68 def format(tag, time, record) row_values = {} row_values[@tag_column_name] = tag unless @tag_column_name.nil? row_values[@time_column_name] = time unless @time_column_name.nil? @fields_to_columns.each {|field,column| next if field.nil? or column.nil? components = field.split(".") value = record for c in components value = value[c] break if value.nil? end row_values[column] = value } row_values.to_msgpack end |
#start ⇒ Object
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/fluent/plugin/out_hbase.rb', line 39 def start super @conn = MassiveRecord::Wrapper::Connection.new(:host => @hbase_host, :port => @hbase_port) @table = MassiveRecord::Wrapper::Table.new(@conn, @hbase_table.intern) unless @table.exists? columns = ([@tag_column_name, @time_column_name] + @mapping.values).reject(&:nil?) column_families = columns.map {|column_family_with_column| column_family, column = column_family_with_column.split(":") if column.nil? or column_family.nil? raise <<MESSAGE Unexpected format for column name: #{column_family_with_column} Each destination column in the 'record_to_columns_mapping' option must be specified in the format of \"column_family:column\". Are you sure you included ':' in column names? MESSAGE end column_family.intern } column_families.uniq! @table.create_column_families(column_families) @table.save end end |
#write(chunk) ⇒ Object
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 |
# File 'lib/fluent/plugin/out_hbase.rb', line 92 def write(chunk) chunk.msgpack_each {|row_values| event = {} row_values.each {|column_family_and_column, value| column_family, column = column_family_and_column.split(":") (event[column_family.intern] ||= {}).update({column => value}) } row = MassiveRecord::Wrapper::Row.new row.id = SecureRandom.uuid row.values = event row.table = @table row.save } end |