Class: Fluent::RethinkOutput

Inherits:
BufferedOutput
  • Object
show all
Includes:
RethinkDB::Shortcuts
Defined in:
lib/fluent/plugin/out_rethink.rb

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object

This method is called before starting. ‘conf’ is a Hash that includes configuration parameters. If the configuration is invalid, raise Fluent::ConfigError.



17
18
19
20
21
22
23
24
# File 'lib/fluent/plugin/out_rethink.rb', line 17

def configure(conf)
  super

  @db    = conf['database']
  @host  = conf['host']
  @port  = conf['port']
  @table = conf['table']
end

#format(tag, time, record) ⇒ Object



38
39
40
# File 'lib/fluent/plugin/out_rethink.rb', line 38

def format(tag, time, record)
  [tag, time, record].to_msgpack
end

#shutdownObject



33
34
35
36
# File 'lib/fluent/plugin/out_rethink.rb', line 33

def shutdown
  super
  @conn.close
end

#startObject



26
27
28
29
30
31
# File 'lib/fluent/plugin/out_rethink.rb', line 26

def start
  super
  @conn = r.connect(:host => @host,
                    :port => @port,
                    :db => @db)
end

#write(chunk) ⇒ Object

This method is called every flush interval. Write the buffer chunk to files or databases here. ‘chunk’ is a buffer chunk that includes multiple formatted events. You can use ‘data = chunk.read’ to get all events and ‘chunk.open {|io| … }’ to get IO objects.

NOTE! This method is called by internal thread, not Fluentd’s main thread. So IO wait doesn’t affect other plugins.



49
50
51
52
53
54
55
56
57
58
59
# File 'lib/fluent/plugin/out_rethink.rb', line 49

def write(chunk)
  records = []
  chunk.msgpack_each {|(tag,time,record)|
    record[@time_key] = Time.at(time || record[@time_key]) if @include_time_key
    records << record
  }
  begin
    r.table("LOP").insert(records).run(@conn) unless records.empty?
  rescue 
  end
end