Class: Fluent::CouchbaseOutput

Inherits:
BufferedOutput
  • Object
show all
Defined in:
lib/fluent/plugin/out_couchbase.rb

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object

Raises:

  • (ConfigError)


22
23
24
25
26
27
28
29
30
31
# File 'lib/fluent/plugin/out_couchbase.rb', line 22

def configure(conf)
  super

  # perform validations
  raise ConfigError, "'hostname' is required by Couchbase output (ex: localhost)" unless self.hostname
  raise ConfigError, "'port' is required by Couchbase output (ex: 8091)" unless self.port
  raise ConfigError, "'pool' is required by Couchbase output (ex: default)" unless self.pool
  raise ConfigError, "'bucket' is required by Couchbase output (ex: default)" unless self.bucket
  raise ConfigError, "'ttl' is required by Couchbase output (ex: 0)" unless self.ttl
end

#connectionObject



18
19
20
# File 'lib/fluent/plugin/out_couchbase.rb', line 18

def connection
  @connection ||= get_connection(self.hostname, self.port, self.pool, self.bucket)
end

#format(tag, time, record) ⇒ Object



42
43
44
# File 'lib/fluent/plugin/out_couchbase.rb', line 42

def format(tag, time, record)
  record.to_msgpack
end

#shutdownObject



38
39
40
# File 'lib/fluent/plugin/out_couchbase.rb', line 38

def shutdown
  super
end

#startObject



33
34
35
36
# File 'lib/fluent/plugin/out_couchbase.rb', line 33

def start
  super
  connection
end

#write(chunk) ⇒ Object



46
47
48
49
50
51
52
53
54
# File 'lib/fluent/plugin/out_couchbase.rb', line 46

def write(chunk)
  chunk.msgpack_each  { |record|
    # store ttl in the document itself?
    record[:ttl] = self.ttl if self.include_ttl

    # persist
    connection[record.delete('key'), :ttl => self.ttl] = record
  }
end