Class: Fluent::RethinkOutput
- Inherits:
-
BufferedOutput
- Object
- BufferedOutput
- Fluent::RethinkOutput
- Includes:
- RethinkDB::Shortcuts
- Defined in:
- lib/fluent/plugin/out_rethink.rb
Instance Method Summary collapse
-
#configure(conf) ⇒ Object
This method is called before starting.
- #format(tag, time, record) ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
-
#write(chunk) ⇒ Object
This method is called every flush interval.
Instance Method Details
#configure(conf) ⇒ Object
This method is called before starting. ‘conf’ is a Hash that includes configuration parameters. If the configuration is invalid, raise Fluent::ConfigError.
17 18 19 20 21 22 23 24 |
# File 'lib/fluent/plugin/out_rethink.rb', line 17 def configure(conf) super @db = conf['database'] @host = conf['host'] @port = conf['port'] @table = conf['table'] end |
#format(tag, time, record) ⇒ Object
38 39 40 |
# File 'lib/fluent/plugin/out_rethink.rb', line 38 def format(tag, time, record) [tag, time, record].to_msgpack end |
#shutdown ⇒ Object
33 34 35 36 |
# File 'lib/fluent/plugin/out_rethink.rb', line 33 def shutdown super @conn.close end |
#start ⇒ Object
26 27 28 29 30 31 |
# File 'lib/fluent/plugin/out_rethink.rb', line 26 def start super @conn = r.connect(:host => @host, :port => @port, :db => @db) end |
#write(chunk) ⇒ Object
This method is called every flush interval. Write the buffer chunk to files or databases here. ‘chunk’ is a buffer chunk that includes multiple formatted events. You can use ‘data = chunk.read’ to get all events and ‘chunk.open {|io| … }’ to get IO objects.
NOTE! This method is called by internal thread, not Fluentd’s main thread. So IO wait doesn’t affect other plugins.
49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/fluent/plugin/out_rethink.rb', line 49 def write(chunk) records = [] chunk.msgpack_each {|(tag,time,record)| record[@time_key] = Time.at(time || record[@time_key]) if @include_time_key records << record } begin r.table("LOP").insert(records).run(@conn) unless records.empty? rescue end end |