Class: Fluent::Logger::FluentLogger

Inherits:
LoggerBase show all
Extended by:
Finalizable
Defined in:
lib/fluent/logger/fluent_logger.rb,
lib/fluent/logger/fluent_logger/cui.rb

Defined Under Namespace

Modules: CUI, Finalizable

Constant Summary collapse

BUFFER_LIMIT =
8*1024*1024
RECONNECT_WAIT =
0.5
RECONNECT_WAIT_INCR_RATE =
1.5
RECONNECT_WAIT_MAX =
60
RECONNECT_WAIT_MAX_COUNT =
(1..100).inject(RECONNECT_WAIT_MAX / RECONNECT_WAIT) {|r,i|
  break i + 1 if r < RECONNECT_WAIT_INCR_RATE
  r / RECONNECT_WAIT_INCR_RATE
}

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Finalizable

finalizer, new

Methods inherited from LoggerBase

open, #post

Constructor Details

#initialize(tag_prefix, *args) ⇒ FluentLogger

Returns a new instance of FluentLogger.



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/fluent/logger/fluent_logger.rb', line 58

def initialize(tag_prefix, *args)
  super()

  options = {
    :host => 'localhost',
    :port => 24224
  }

  case args.first
  when String, Symbol
    # backward compatible
    options[:host] = args[0]
    options[:port] = args[1] if args[1]
  when Hash
    options.update args.first
  end

  @tag_prefix = tag_prefix
  @host = options[:host]
  @port = options[:port]

  @mon = Monitor.new
  @pending = nil
  @connect_error_history = []

  @limit = options[:buffer_limit] || BUFFER_LIMIT
  @log_reconnect_error_threshold = options[:log_reconnect_error_threshold] ||  RECONNECT_WAIT_MAX_COUNT

  if logger = options[:logger]
    @logger = logger
  else
    @logger = ::Logger.new(STDERR)
    if options[:debug]
      @logger.level = ::Logger::DEBUG
    else
      @logger.level = ::Logger::INFO
    end
  end

  @last_error = {}

  begin
    connect!
  rescue => e
    set_last_error(e)
    @logger.error "Failed to connect fluentd: #{$!}"
    @logger.error "Connection will be retried."
  end
end

Instance Attribute Details

#last_errorObject (readonly)

Returns the value of attribute last_error.



109
110
111
# File 'lib/fluent/logger/fluent_logger.rb', line 109

def last_error
  @last_error
end

#limitObject

Returns the value of attribute limit.



108
109
110
# File 'lib/fluent/logger/fluent_logger.rb', line 108

def limit
  @limit
end

#log_reconnect_error_thresholdObject

Returns the value of attribute log_reconnect_error_threshold.



108
109
110
# File 'lib/fluent/logger/fluent_logger.rb', line 108

def log_reconnect_error_threshold
  @log_reconnect_error_threshold
end

#loggerObject

Returns the value of attribute logger.



108
109
110
# File 'lib/fluent/logger/fluent_logger.rb', line 108

def logger
  @logger
end

Instance Method Details

#batch_post_with_time(messages) ⇒ Object



121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# File 'lib/fluent/logger/fluent_logger.rb', line 121

def batch_post_with_time(messages)
  # convert each message
  # batch send
  # if batch > byte max, send in multiple requests
  #
  # how to handle errors? immediate fail batch, or skip bad messageS?

  # what if pending is not null here? if we check needs to be syncronized

  # Reorder since #post_with_time takes a different order than #write
  # NOTE Should we mangle the time? It should be .to_i here
  payloads = messages.map do |msg| 
    proper = msg.values_at(0, 2, 1)
    proper[1] = proper[1].to_i unless proper[1].is_a?(Fixnum) # Convert time
    prepare_msg proper
  end

  # Check for 'false' payloads, those are errors, and last message will be set.

  payload = ""

  payloads.each do |data|
    if payload.bytesize + data.bytesize > @limit
      raw_write payload
      payload = ""
    end

    payload << data
  end

  raw_write payload if payload.bytesize > 0
end

#closeObject



154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
# File 'lib/fluent/logger/fluent_logger.rb', line 154

def close
  @mon.synchronize {
    if @pending
      begin
        send_data(@pending)
      rescue => e
        set_last_error(e)
        @logger.error("FluentLogger: Can't send logs to #{@host}:#{@port}: #{$!}")
      end
    end
    @con.close if connect?
    @con = nil
    @pending = nil
  }
  self
end

#connect?Boolean

Returns:

  • (Boolean)


171
172
173
# File 'lib/fluent/logger/fluent_logger.rb', line 171

def connect?
  !!@con
end

#finalizeObject



175
176
177
# File 'lib/fluent/logger/fluent_logger.rb', line 175

def finalize
  close
end

#post_with_time(tag, map, time) ⇒ Object



115
116
117
118
119
# File 'lib/fluent/logger/fluent_logger.rb', line 115

def post_with_time(tag, map, time)
  @logger.debug { "event: #{tag} #{map.to_json}" rescue nil }
  tag = "#{@tag_prefix}.#{tag}" if @tag_prefix
  write [tag, time.to_i, map]
end