Class: Writer

Inherits:
Object
  • Object
show all
Includes:
MonitorMixin
Defined in:
lib/fluent/command/cat.rb

Defined Under Namespace

Classes: TimerThread

Constant Summary collapse

RetryLimitError =
Class.new(StandardError)

Instance Method Summary collapse

Constructor Details

#initialize(tag, connector, time_as_integer: false, retry_limit: 5) ⇒ Writer

Returns a new instance of Writer.



137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
# File 'lib/fluent/command/cat.rb', line 137

def initialize(tag, connector, time_as_integer: false, retry_limit: 5)
  @tag = tag
  @connector = connector
  @socket = false

  @socket_time = Time.now.to_i
  @socket_ttl = 10  # TODO
  @error_history = []

  @pending = []
  @pending_limit = 1024  # TODO
  @retry_wait = 1
  @retry_limit = retry_limit
  @time_as_integer = time_as_integer

  super()
end

Instance Method Details

#closeObject



196
197
198
199
# File 'lib/fluent/command/cat.rb', line 196

def close
  @socket.close
  @socket = nil
end

#on_timerObject



177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
# File 'lib/fluent/command/cat.rb', line 177

def on_timer
  now = Time.now.to_i

  synchronize {
    unless @pending.empty?
      # flush pending records
      if write_impl(@pending)
        # write succeeded
        @pending.clear
      end
    end

    if @socket && @socket_time + @socket_ttl < now
      # socket is not used @socket_ttl seconds
      close
    end
  }
end

#shutdownObject



207
208
209
# File 'lib/fluent/command/cat.rb', line 207

def shutdown
  @timer.shutdown
end

#startObject



201
202
203
204
205
# File 'lib/fluent/command/cat.rb', line 201

def start
  @timer = TimerThread.new(self)
  @timer.start
  self
end

#write(record) ⇒ Object



155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
# File 'lib/fluent/command/cat.rb', line 155

def write(record)
  if record.class != Hash
    raise ArgumentError, "Input must be a map (got #{record.class})"
  end

  time = Fluent::EventTime.now
  time = time.to_i if @time_as_integer
  entry = [time, record]
  synchronize {
    unless write_impl([entry])
      # write failed
      @pending.push(entry)

      while @pending.size > @pending_limit
        # exceeds pending limit; trash oldest record
        time, record = @pending.shift
        abort_message(time, record)
      end
    end
  }
end