Class: Fluent::SQSOutput

Inherits:
BufferedOutput
  • Object
show all
Includes:
SetTagKeyMixin, SetTimeKeyMixin
Defined in:
lib/fluent/plugin/out_sqs.rb

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object



26
27
28
# File 'lib/fluent/plugin/out_sqs.rb', line 26

def configure(conf)
    super
end

#format(tag, time, record) ⇒ Object



50
51
52
53
54
55
56
# File 'lib/fluent/plugin/out_sqs.rb', line 50

def format(tag, time, record)
    if @include_tag then
        record[@tag_property_name] = tag
    end

    record.to_msgpack
end

#shutdownObject



46
47
48
# File 'lib/fluent/plugin/out_sqs.rb', line 46

def shutdown
    super
end

#startObject



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/fluent/plugin/out_sqs.rb', line 30

def start
    super

    AWS.config(
        :access_key_id => @aws_key_id,
        :secret_access_key => @aws_sec_key)

    @sqs = AWS::SQS.new(
        :sqs_endpoint => @sqs_endpoint)
    if @create_queue then
        @queue = @sqs.queues.create(@queue_name)
    else
        @queue = @sqs.queues.named(@queue_name)
    end
end

#write(chunk) ⇒ Object



58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/fluent/plugin/out_sqs.rb', line 58

def write(chunk)
    records = []
    chunk.msgpack_each {|record|
        if ObjectSpace.memsize_of(record) < @max_size
            records << { :message_body => Yajl.dump(record), :delay_seconds => @delay_seconds }
        else
            log.info "Could not send log to SQS: the size of log exceeded max_size"
        end
    }
    until records.length <= 0 do
        @queue.batch_send(records.slice!(0..9))
    end
end