Class: Fluent::Riak2Output
- Inherits:
-
BufferedOutput
- Object
- BufferedOutput
- Fluent::Riak2Output
- Includes:
- SetTagKeyMixin, SetTimeKeyMixin
- Defined in:
- lib/fluent/plugin/out_riak2.rb
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #format(tag, time, record) ⇒ Object
-
#initialize ⇒ Riak2Output
constructor
A new instance of Riak2Output.
- #start ⇒ Object
- #write(chunk) ⇒ Object
Constructor Details
#initialize ⇒ Riak2Output
Returns a new instance of Riak2Output.
16 17 18 19 20 21 |
# File 'lib/fluent/plugin/out_riak2.rb', line 16 def initialize super require 'riak' require 'msgpack' require 'uuidtools' end |
Instance Method Details
#configure(conf) ⇒ Object
23 24 25 26 27 28 29 30 31 |
# File 'lib/fluent/plugin/out_riak2.rb', line 23 def configure(conf) super @nodes = @nodes.split(',').map{ |s| ip,port = s.split(':') {:host => ip, :pb_port => port.to_i} } $log.info "riak nodes=#{@nodes}" end |
#format(tag, time, record) ⇒ Object
63 64 65 |
# File 'lib/fluent/plugin/out_riak2.rb', line 63 def format(tag, time, record) [time, tag, record].to_msgpack end |
#start ⇒ Object
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/fluent/plugin/out_riak2.rb', line 33 def start $log.debug " => #{@buffer.chunk_limit} #{@buffer.queue_limit} " @client = Riak::Client.new(:nodes => @nodes) @bucket = @client.bucket(@bucket_name) @buf = {} # $log.debug "riak2_metadata_bucket_type => #{@riak2_metadata_bucket_type}" # $log.debug "bucket_type => #{@bucket_type}" if not @riak2_metadata_bucket_type.empty? then # Here we are storing our bucket type and bucket name in a metadata map. This allows clients to query that map to see a list of all fluentd buckets. # bucket_type/name/key is returns a metadata map = @riak2_metadata_bucket_type # config defined bucket type = "fluent-plugin-riak2-metadata" # bucket name = "fluent-plugin-riak2-metadata-key" # root level key for our metadata map # our metadata map has a kv where: # 1. key is set_of_logfile_buckets_key # 2. value is a set of strings. each string represents the bucket type and name for a single logfile set_of_logfile_buckets_key = "all_buckets" # inner key for our set of all logfile bucket type/name mdbucket = @client.bucket() Riak::Crdt::DEFAULT_BUCKET_TYPES[:map] = map = Riak::Crdt::Map.new(mdbucket, ) map.sets[set_of_logfile_buckets_key].add "#{@bucket_type} #{@bucket_name}" end super end |
#write(chunk) ⇒ Object
67 68 69 70 71 72 73 74 75 76 |
# File 'lib/fluent/plugin/out_riak2.rb', line 67 def write(chunk) $log.debug " <<<<<===========\n" records = [] chunk.msgpack_each { |time, tag, record| record[@tag_key] = tag records << record $log.debug record } put_now(records) end |