Class: Fluent::Logger::FluentLogger

Inherits:
LoggerBase show all
Extended by:
Finalizable
Defined in:
lib/fluent/logger/fluent_logger.rb,
lib/fluent/logger/fluent_logger/cui.rb

Defined Under Namespace

Modules: CUI, Finalizable

Constant Summary collapse

BUFFER_LIMIT =
8*1024*1024
RECONNECT_WAIT =
0.5
RECONNECT_WAIT_INCR_RATE =
1.5
RECONNECT_WAIT_MAX =
60
RECONNECT_WAIT_MAX_COUNT =
(1..100).inject(RECONNECT_WAIT_MAX / RECONNECT_WAIT) {|r,i|
  break i + 1 if r < RECONNECT_WAIT_INCR_RATE
  r / RECONNECT_WAIT_INCR_RATE
}

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Finalizable

finalizer, new

Methods inherited from LoggerBase

open, #post

Constructor Details

#initialize(tag_prefix, *args) ⇒ FluentLogger

Returns a new instance of FluentLogger.



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/fluent/logger/fluent_logger.rb', line 58

def initialize(tag_prefix, *args)
  super()

  options = {
    :host => 'localhost',
    :port => 24224
  }

  case args.first
  when String, Symbol
    # backward compatible
    options[:host] = args[0]
    options[:port] = args[1] if args[1]
  when Hash
    options.update args.first
  end

  @tag_prefix = tag_prefix
  @host = options[:host]
  @port = options[:port]

  @mon = Monitor.new
  @pending = nil
  @connect_error_history = []

  @limit = options[:buffer_limit] || BUFFER_LIMIT
  @log_reconnect_error_threshold = options[:log_reconnect_error_threshold] ||  RECONNECT_WAIT_MAX_COUNT

  if logger = options[:logger]
    @logger = logger
  else
    @logger = ::Logger.new(STDERR)
    if options[:debug]
      @logger.level = ::Logger::DEBUG
    else
      @logger.level = ::Logger::INFO
    end
  end

  @last_error = {}

  begin
    connect!
  rescue => e
    set_last_error(e)
    @logger.error "Failed to connect fluentd: #{$!}"
    @logger.error "Connection will be retried."
  end
end

Instance Attribute Details

#last_errorObject (readonly)

Returns the value of attribute last_error.



109
110
111
# File 'lib/fluent/logger/fluent_logger.rb', line 109

def last_error
  @last_error
end

#limitObject

Returns the value of attribute limit.



108
109
110
# File 'lib/fluent/logger/fluent_logger.rb', line 108

def limit
  @limit
end

#log_reconnect_error_thresholdObject

Returns the value of attribute log_reconnect_error_threshold.



108
109
110
# File 'lib/fluent/logger/fluent_logger.rb', line 108

def log_reconnect_error_threshold
  @log_reconnect_error_threshold
end

#loggerObject

Returns the value of attribute logger.



108
109
110
# File 'lib/fluent/logger/fluent_logger.rb', line 108

def logger
  @logger
end

Instance Method Details

#batch_post_with_time(messages) ⇒ Object



120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
# File 'lib/fluent/logger/fluent_logger.rb', line 120

def batch_post_with_time(messages)
  # convert each message
  # batch send
  # if batch > byte max, send in multiple requests
  #
  # how to handle errors? immediate fail batch, or skip bad messageS?

  # what if pending is not null here? if we check needs to be syncronized

  # Reorder since #post_with_time takes a different order than #write
  # NOTE Should we mangle the time? It should be .to_i here
  payloads = messages.map do |msg| 
    prepare_msg prepare_post_with_time_arguments(*msg)
  end

  # Check for 'false' payloads, those are errors, and last message will be set.

  payload = ""

  payloads.each do |data|
    if (payload.bytesize + data.bytesize) > @limit
      raw_write payload
      payload = ""
    end

    payload << data
  end

  raw_write payload if payload.bytesize > 0
end

#closeObject



151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
# File 'lib/fluent/logger/fluent_logger.rb', line 151

def close
  @mon.synchronize {
    if @pending
      begin
        send_data(@pending)
      rescue => e
        set_last_error(e)
        @logger.error("FluentLogger: Can't send logs to #{@host}:#{@port}: #{$!}")
      end
    end
    @con.close if connect?
    @con = nil
    @pending = nil
  }
  self
end

#connect?Boolean

Returns:

  • (Boolean)


168
169
170
# File 'lib/fluent/logger/fluent_logger.rb', line 168

def connect?
  !!@con
end

#finalizeObject



172
173
174
# File 'lib/fluent/logger/fluent_logger.rb', line 172

def finalize
  close
end

#post_with_time(tag, map, time) ⇒ Object



115
116
117
118
# File 'lib/fluent/logger/fluent_logger.rb', line 115

def post_with_time(tag, map, time)
  @logger.debug { "event: #{tag} #{map.to_json}" rescue nil }
  write prepare_post_with_time_arguments(tag, map, time)
end