Class: Fluent::Plugin::CloudwatchIngestInput::State
- Inherits:
-
Object
- Object
- Fluent::Plugin::CloudwatchIngestInput::State
- Defined in:
- lib/fluent/plugin/in_cloudwatch_ingest.rb
Defined Under Namespace
Classes: LockFailed
Instance Attribute Summary collapse
-
#new_store ⇒ Object
Returns the value of attribute new_store.
-
#statefile ⇒ Object
Returns the value of attribute statefile.
-
#store ⇒ Object
Returns the value of attribute store.
Instance Method Summary collapse
- #close ⇒ Object
-
#initialize(filepath, log) ⇒ State
constructor
A new instance of State.
- #save ⇒ Object
Constructor Details
#initialize(filepath, log) ⇒ State
Returns a new instance of State.
408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 |
# File 'lib/fluent/plugin/in_cloudwatch_ingest.rb', line 408 def initialize(filepath, log) @filepath = filepath @log = log @store = Hash.new { |h, k| h[k] = Hash.new { |x, y| x[y] = {} } } @new_store = Hash.new { |h, k| h[k] = Hash.new { |x, y| x[y] = {} } } if File.exist?(filepath) self.statefile = Pathname.new(@filepath).open('r+') else @log.warn("No state file #{statefile} Creating a new one.") begin self.statefile = Pathname.new(@filepath).open('w+') save rescue StandardError => boom @log.error("Unable to create new file #{statefile.path}: "\ "#{boom.inspect}") end end # Attempt to obtain an exclusive flock on the file and raise and # exception if we can't @log.info("Obtaining exclusive lock on state file #{statefile.path}") lockstatus = statefile.flock(File::LOCK_EX | File::LOCK_NB) raise CloudwatchIngestInput::State::LockFailed unless lockstatus begin @store.merge!(Psych.safe_load(statefile.read)) # Migrate old state file @store.each_value do |streams| streams.update(streams) do |_name, stream| if stream.is_a? String return { 'token' => stream, 'timestamp' => Time.now.to_i } end return stream end end @log.info("Loaded #{@store.keys.size} groups from #{statefile.path}") rescue StandardError statefile.close raise end end |
Instance Attribute Details
#new_store ⇒ Object
Returns the value of attribute new_store.
406 407 408 |
# File 'lib/fluent/plugin/in_cloudwatch_ingest.rb', line 406 def new_store @new_store end |
#statefile ⇒ Object
Returns the value of attribute statefile.
406 407 408 |
# File 'lib/fluent/plugin/in_cloudwatch_ingest.rb', line 406 def statefile @statefile end |
#store ⇒ Object
Returns the value of attribute store.
406 407 408 |
# File 'lib/fluent/plugin/in_cloudwatch_ingest.rb', line 406 def store @store end |
Instance Method Details
#close ⇒ Object
461 462 463 |
# File 'lib/fluent/plugin/in_cloudwatch_ingest.rb', line 461 def close statefile.close end |
#save ⇒ Object
453 454 455 456 457 458 459 |
# File 'lib/fluent/plugin/in_cloudwatch_ingest.rb', line 453 def save statefile.rewind statefile.truncate(0) statefile.write(Psych.dump(@new_store)) @log.info("Saved state to #{statefile.path}") statefile.rewind end |