Class: Fluent::RiakOutput
- Inherits:
-
BufferedOutput
- Object
- BufferedOutput
- Fluent::RiakOutput
- Includes:
- SetTagKeyMixin, SetTimeKeyMixin
- Defined in:
- lib/fluent/plugin/out_riak.rb
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #format(tag, time, record) ⇒ Object
-
#initialize ⇒ RiakOutput
constructor
A new instance of RiakOutput.
- #start ⇒ Object
- #write(chunk) ⇒ Object
Constructor Details
#initialize ⇒ RiakOutput
Returns a new instance of RiakOutput.
14 15 16 17 18 19 |
# File 'lib/fluent/plugin/out_riak.rb', line 14 def initialize super require 'riak' require 'msgpack' require 'uuidtools' end |
Instance Method Details
#configure(conf) ⇒ Object
21 22 23 24 25 26 27 28 29 |
# File 'lib/fluent/plugin/out_riak.rb', line 21 def configure(conf) super @nodes = @nodes.split(',').map{ |s| ip,port = s.split(':') {:host => ip, :pb_port => port.to_i} } $log.info "riak nodes=#{@nodes}" end |
#format(tag, time, record) ⇒ Object
40 41 42 |
# File 'lib/fluent/plugin/out_riak.rb', line 40 def format(tag, time, record) [time, tag, record].to_msgpack end |
#start ⇒ Object
31 32 33 34 35 36 37 38 |
# File 'lib/fluent/plugin/out_riak.rb', line 31 def start $log.debug " => #{@buffer.chunk_limit} #{@buffer.queue_limit} " @conn = Riak::Client.new(:nodes => @nodes, :protocol => "pbc") @bucket = @conn.bucket(@bucket_name) @buf = {} super end |
#write(chunk) ⇒ Object
44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/fluent/plugin/out_riak.rb', line 44 def write(chunk) $log.debug " <<<<<===========\n" records = [] = [] chunk.msgpack_each { |time, tag, record| record[@tag_key] = tag << tag records << record $log.debug record } put_now(records, ) end |