Class: Fluent::RiakOutput

Inherits:
BufferedOutput
  • Object
show all
Includes:
SetTagKeyMixin, SetTimeKeyMixin
Defined in:
lib/fluent/plugin/out_riak.rb

Instance Method Summary collapse

Constructor Details

#initializeRiakOutput

Returns a new instance of RiakOutput.



14
15
16
17
18
19
# File 'lib/fluent/plugin/out_riak.rb', line 14

def initialize
  super
  require 'riak'
  require 'msgpack'
  require 'uuidtools'
end

Instance Method Details

#configure(conf) ⇒ Object



21
22
23
24
25
26
27
28
29
# File 'lib/fluent/plugin/out_riak.rb', line 21

def configure(conf)
  super

  @nodes = @nodes.split(',').map{ |s|
    ip,port = s.split(':')
    {:host => ip, :pb_port => port.to_i}
  }
  $log.info "riak nodes=#{@nodes}"
end

#format(tag, time, record) ⇒ Object



40
41
42
# File 'lib/fluent/plugin/out_riak.rb', line 40

def format(tag, time, record)
  [time, tag, record].to_msgpack
end

#startObject



31
32
33
34
35
36
37
38
# File 'lib/fluent/plugin/out_riak.rb', line 31

def start
  $log.debug " => #{@buffer.chunk_limit} #{@buffer.queue_limit} "
  @conn = Riak::Client.new(:nodes => @nodes, :protocol => "pbc")
  @bucket = @conn.bucket(@bucket_name)
  @buf = {}

  super
end

#write(chunk) ⇒ Object



44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/fluent/plugin/out_riak.rb', line 44

def write(chunk)
  $log.debug " <<<<<===========\n"

  records  = []
  tags = []
  chunk.msgpack_each { |time, tag, record|
    record[@tag_key] = tag
    tags << tag
    records << record
    $log.debug record
  }
  put_now(records, tags)
end