Class: Fluent::Plugin::CloudwatchIngestInput::State

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/plugin/in_cloudwatch_ingest.rb

Defined Under Namespace

Classes: LockFailed

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(filepath, log) ⇒ State

Returns a new instance of State.



358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
# File 'lib/fluent/plugin/in_cloudwatch_ingest.rb', line 358

def initialize(filepath, log)
  @filepath = filepath
  @log = log
  @store = Hash.new { |h, k| h[k] = Hash.new { |x, y| x[y] = {} } }
  @new_store = Hash.new { |h, k| h[k] = Hash.new { |x, y| x[y] = {} } }

  if File.exist?(filepath)
    self.statefile = Pathname.new(@filepath).open('r+')
  else
    @log.warn("No state file #{statefile} Creating a new one.")
    begin
      self.statefile = Pathname.new(@filepath).open('w+')
      save
    rescue => boom
      @log.error("Unable to create new file #{statefile.path}: "\
                 "#{boom.inspect}")
    end
  end

  # Attempt to obtain an exclusive flock on the file and raise and
  # exception if we can't
  @log.info("Obtaining exclusive lock on state file #{statefile.path}")
  lockstatus = statefile.flock(File::LOCK_EX | File::LOCK_NB)
  raise CloudwatchIngestInput::State::LockFailed if lockstatus == false

  begin
    @store.merge!(Psych.safe_load(statefile.read))

    # Migrate old state file
    @store.each do |_group, streams|
      streams.update(streams) do |_name, stream|
        if stream.is_a? String
          return { 'token' => stream, 'timestamp' => Time.now.to_i }
        end
        return stream
      end
    end

    @log.info("Loaded #{@store.keys.size} groups from #{statefile.path}")
  rescue
    statefile.close
    raise
  end
end

Instance Attribute Details

#new_storeObject

Returns the value of attribute new_store.



356
357
358
# File 'lib/fluent/plugin/in_cloudwatch_ingest.rb', line 356

def new_store
  @new_store
end

#statefileObject

Returns the value of attribute statefile.



356
357
358
# File 'lib/fluent/plugin/in_cloudwatch_ingest.rb', line 356

def statefile
  @statefile
end

#storeObject

Returns the value of attribute store.



356
357
358
# File 'lib/fluent/plugin/in_cloudwatch_ingest.rb', line 356

def store
  @store
end

Instance Method Details

#closeObject



411
412
413
# File 'lib/fluent/plugin/in_cloudwatch_ingest.rb', line 411

def close
  statefile.close
end

#saveObject



403
404
405
406
407
408
409
# File 'lib/fluent/plugin/in_cloudwatch_ingest.rb', line 403

def save
  statefile.rewind
  statefile.truncate(0)
  statefile.write(Psych.dump(@new_store))
  @log.info("Saved state to #{statefile.path}")
  statefile.rewind
end