Class: Fluent::Riak2Output

Inherits:
BufferedOutput
  • Object
show all
Includes:
SetTagKeyMixin, SetTimeKeyMixin
Defined in:
lib/fluent/plugin/out_riak2.rb

Instance Method Summary collapse

Constructor Details

#initializeRiak2Output

Returns a new instance of Riak2Output.



16
17
18
19
20
21
# File 'lib/fluent/plugin/out_riak2.rb', line 16

def initialize
  super
  require 'riak'
  require 'msgpack'
  require 'uuidtools'
end

Instance Method Details

#configure(conf) ⇒ Object



23
24
25
26
27
28
29
30
31
# File 'lib/fluent/plugin/out_riak2.rb', line 23

def configure(conf)
  super

  @nodes = @nodes.split(',').map{ |s|
    ip,port = s.split(':')
    {:host => ip, :pb_port => port.to_i}
  }
  $log.info "riak nodes=#{@nodes}"
end

#format(tag, time, record) ⇒ Object



63
64
65
# File 'lib/fluent/plugin/out_riak2.rb', line 63

def format(tag, time, record)
  [time, tag, record].to_msgpack
end

#startObject



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/fluent/plugin/out_riak2.rb', line 33

def start
  $log.debug " => #{@buffer.chunk_limit} #{@buffer.queue_limit} "
  @client = Riak::Client.new(:nodes => @nodes)
  @bucket = @client.bucket(@bucket_name)
  @buf = {}

  # $log.debug "riak2_metadata_bucket_type => #{@riak2_metadata_bucket_type}"
  # $log.debug "bucket_type => #{@bucket_type}"

  if not @riak2_metadata_bucket_type.empty? then
    # Here we are storing our bucket type and bucket name in a metadata map.  This allows clients to query that map to see a list of all fluentd buckets.

    # bucket_type/name/key is returns a metadata map
     = @riak2_metadata_bucket_type        # config defined bucket type
     = "fluent-plugin-riak2-metadata"     # bucket name
             = "fluent-plugin-riak2-metadata-key" # root level key for our metadata map

    # our metadata map has a kv where:
    # 1. key is set_of_logfile_buckets_key
    # 2. value is a set of strings.  each string represents the bucket type and name for a single logfile
    set_of_logfile_buckets_key = "all_buckets"                # inner key for our set of all logfile bucket type/name

    mdbucket = @client.bucket()
    Riak::Crdt::DEFAULT_BUCKET_TYPES[:map] = 
    map = Riak::Crdt::Map.new(mdbucket, )
    map.sets[set_of_logfile_buckets_key].add "#{@bucket_type} #{@bucket_name}"
  end
  super
end

#write(chunk) ⇒ Object



67
68
69
70
71
72
73
74
75
76
# File 'lib/fluent/plugin/out_riak2.rb', line 67

def write(chunk)
  $log.debug " <<<<<===========\n"
  records  = []
  chunk.msgpack_each { |time, tag, record|
    record[@tag_key] = tag
    records << record
    $log.debug record
  }
  put_now(records)
end