Class: Fluent::AzuretablesOutput

Inherits:
BufferedOutput
  • Object
show all
Includes:
DetachMultiProcessMixin
Defined in:
lib/fluent/plugin/out_azuretables.rb

Constant Summary collapse

ENTITY_SIZE_LIMIT =

1MB

1024 * 1024
BATCHWRITE_ENTITY_LIMIT =
100
BATCHWRITE_SIZE_LIMIT =

4MB

4 * 1024 * 1024

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object

This method is called before starting. ‘conf’ is a Hash that includes configuration parameters. If the configuration is invalid, raise Fluent::ConfigError



31
32
33
34
35
36
37
38
39
40
41
# File 'lib/fluent/plugin/out_azuretables.rb', line 31

def configure(conf)
  super
  unless @partition_keys.nil?
    @partition_key_array = @partition_keys.split(',')
  end

  unless @row_keys.nil?
    @row_key_array = @row_keys.split(',')
  end

end

#format(tag, time, record) ⇒ Object

create entity from event record



76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/fluent/plugin/out_azuretables.rb', line 76

def format(tag, time, record)
  partition_keys = []
  row_keys = []
  record.each_pair do |name, val|
    if @partition_key_array && @partition_key_array.include?(name)
      partition_keys << val
      record.delete(name)
    elsif @row_key_array && @row_key_array.include?(name)
      row_keys << val
      record.delete(name)
    end
  end
  if @add_time_to_partition_key
    partition_keys << Time.now.strftime("%Y%m%d")
  end
  if @add_time_to_row_key
    row_keys << Time.now.getutc.to_i
  end
  if @add_uuid_to_row_key
    row_keys << SecureRandom.uuid
  end

  entity = Hash.new
  entity['partition_key'] = partition_keys.join(@key_delimiter)
  entity['row_key'] = row_keys.join(@key_delimiter)
  entity['entity_values'] = record
  entity.to_msgpack
end

#format_key(record, keys, key_delimiter) ⇒ Object



105
106
107
108
109
110
111
# File 'lib/fluent/plugin/out_azuretables.rb', line 105

def format_key(record, keys, key_delimiter)
  ret = []
  record.each_pair do |name, val|
    ret << val if keys.include?(name)
  end
  ret.join(key_delimiter)
end

#insert_entities(partition_key, entities) ⇒ Object



134
135
136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/fluent/plugin/out_azuretables.rb', line 134

def insert_entities(partition_key, entities)
  begin
    batch = Azure::Storage::Table::Batch.new(@table, partition_key) do
      entities.each do |entity|
        insert entity['row_key'], entity['entity_values']
      end
    end
    return @table_client.execute_batch(batch)
  rescue Exception => e
    log.fatal "UnknownError: '#{e}'"
    log.debug partition_key
    log.debug entities.inspect
  end
end

#shutdownObject

This method is called when shutting down.



71
72
73
# File 'lib/fluent/plugin/out_azuretables.rb', line 71

def shutdown
  super
end

#startObject

connect azure table storage service



44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/fluent/plugin/out_azuretables.rb', line 44

def start
  super
  unless @account_name.nil? || @access_key.nil?
    @table_client = Azure::Storage::Table::TableService.create(storage_account_name: @account_name,storage_access_key:@access_key)
  end

  begin
    @table_client.create_table(@table) if @create_table_if_not_exists && !table_exists?(@table)
  rescue Exception => e
    log.error e
    exit!
  end
end

#table_exists?(table_name) ⇒ Boolean

Returns:

  • (Boolean)


58
59
60
61
62
63
64
65
66
67
68
# File 'lib/fluent/plugin/out_azuretables.rb', line 58

def table_exists?(table_name)
  begin
    @table_client.get_table(table_name)
    true
  rescue Azure::Core::Http::HTTPError => e
    false
  rescue Exception => e
    log.fatal "UnknownError: '#{e}'"
    exit!
  end
end

#write(chunk) ⇒ Object



113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/fluent/plugin/out_azuretables.rb', line 113

def write(chunk)
  batch_size = 0
  group_entities = Hash.new
  chunk.msgpack_each do |entity|
    partition_key = entity['partition_key']
    group_entities[partition_key] = [] unless group_entities.has_key?(partition_key)
    group_entities[partition_key] << entity
    batch_size += entity.to_json.length
    if group_entities[partition_key].size >= BATCHWRITE_ENTITY_LIMIT || batch_size >= BATCHWRITE_SIZE_LIMIT
      insert_entities(partition_key, group_entities[partition_key])
      group_entities[partition_key] = []
      batch_size = 0
    end
  end
  unless group_entities.empty?
    group_entities.each_pair do |partition_key, entities|
      insert_entities(partition_key, entities)
    end
  end
end