Class: Fluent::CouchOutput

Inherits:
BufferedOutput
  • Object
show all
Includes:
SetTagKeyMixin, SetTimeKeyMixin
Defined in:
lib/fluent/plugin/out_couch.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeCouchOutput

Returns a new instance of CouchOutput.



28
29
30
31
32
33
34
35
36
# File 'lib/fluent/plugin/out_couch.rb', line 28

def initialize
    super
    
    require 'msgpack'
    require 'jsonpath'
    Encoding.default_internal = 'UTF-8'
    require 'couchrest'
    Encoding.default_internal = 'ASCII-8BIT'
end

Instance Attribute Details

#dbObject (readonly)

for tests



3
4
5
# File 'lib/fluent/plugin/out_couch.rb', line 3

def db
  @db
end

Instance Method Details

#configure(conf) ⇒ Object



38
39
40
41
42
# File 'lib/fluent/plugin/out_couch.rb', line 38

def configure(conf)
    super
     = "#{@user}:#{@password}@" if @user && @password
    @db = CouchRest.database!("#{@protocol}://#{}#{@host}:#{@port}/#{@database}")
end

#format(tag, time, record) ⇒ Object



62
63
64
# File 'lib/fluent/plugin/out_couch.rb', line 62

def format(tag, time, record)
    record.to_msgpack
end

#shutdownObject



58
59
60
# File 'lib/fluent/plugin/out_couch.rb', line 58

def shutdown
    super
end

#startObject



44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/fluent/plugin/out_couch.rb', line 44

def start
    super
    @views = []
    if @refresh_view_index
        begin
            @db.get("_design/#{@refresh_view_index}")['views'].each do |view_name,func|
                @views.push([@refresh_view_index,view_name])
            end
            rescue
            $log.error 'design document not found!'
        end
    end
end

#update_docs(records) ⇒ Object



83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/fluent/plugin/out_couch.rb', line 83

def update_docs(records)
    if records.length > 0
        records.each{|record|
            doc = nil
            begin
                doc = @db.get(record['_id'])
                rescue
            end
            record['_rev']=doc['_rev'] unless doc.nil?
            $log.debug record
            @db.save_doc(record) 
        }
    end
end

#update_view_indexObject



98
99
100
101
102
# File 'lib/fluent/plugin/out_couch.rb', line 98

def update_view_index()
    @views.each do |design,view|
        @db.view("#{design}/#{view}",{"limit"=>"0"})
    end
end

#write(chunk) ⇒ Object



66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/fluent/plugin/out_couch.rb', line 66

def write(chunk)
    records = []
    chunk.msgpack_each {|record|
        
        id = record[@doc_key_field]
        id = JsonPath.new(@doc_key_jsonpath).first(record) if id.nil? && !@doc_key_jsonpath.nil?
        record['_id'] = id unless id.nil?
        records << record
    }
    unless @update_docs
        @db.bulk_save(records)
        else
        update_docs(records)
    end
    update_view_index
end