Class: LogStash::Outputs::LogstashAzureBlobOutput::FileRepository

Inherits:
Object
  • Object
show all
Defined in:
lib/logstash/outputs/blob/file_repository.rb

Overview

sub class for LogstashAzureBlobOutput this class manages the temporary directory for the temporary files

Defined Under Namespace

Classes: FactoryInitializer, PrefixedValue

Constant Summary collapse

DEFAULT_STATE_SWEEPER_INTERVAL_SECS =
60
DEFAULT_STALE_TIME_SECS =
15 * 60

Instance Method Summary collapse

Constructor Details

#initialize(tags, encoding, temporary_directory, stale_time = DEFAULT_STALE_TIME_SECS, sweeper_interval = DEFAULT_STATE_SWEEPER_INTERVAL_SECS) ⇒ FileRepository

initializes the class with more variables



66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/logstash/outputs/blob/file_repository.rb', line 66

def initialize(tags, encoding, temporary_directory,
               stale_time = DEFAULT_STALE_TIME_SECS,
               sweeper_interval = DEFAULT_STATE_SWEEPER_INTERVAL_SECS)
  # The path need to contains the prefix so when we start
  # logtash after a crash we keep the remote structure
  @prefixed_factories = ConcurrentHashMap.new

  @sweeper_interval = sweeper_interval

  @factory_initializer = FactoryInitializer.new(tags, encoding, temporary_directory, stale_time)

  start_stale_sweeper
end

Instance Method Details

#each_filesObject

with lock for each file



86
87
88
89
90
# File 'lib/logstash/outputs/blob/file_repository.rb', line 86

def each_files
  @prefixed_factories.elements.each do |prefixed_file|
    prefixed_file.with_lock { |factory| yield factory.current }
  end
end

#get_factory(prefix_key) ⇒ Object

Return the file factory



93
94
95
# File 'lib/logstash/outputs/blob/file_repository.rb', line 93

def get_factory(prefix_key)
  @prefixed_factories.computeIfAbsent(prefix_key, @factory_initializer).with_lock { |factory| yield factory }
end

#get_file(prefix_key) ⇒ Object

gets file from prefix_key



98
99
100
# File 'lib/logstash/outputs/blob/file_repository.rb', line 98

def get_file(prefix_key)
  get_factory(prefix_key) { |factory| yield factory.current }
end

#keysObject

gets the key set



81
82
83
# File 'lib/logstash/outputs/blob/file_repository.rb', line 81

def keys
  @prefixed_factories.keySet
end

#remove_stale(k, v) ⇒ Object

remove the stale given key and value



113
114
115
116
117
118
# File 'lib/logstash/outputs/blob/file_repository.rb', line 113

def remove_stale(k, v)
  if v.stale? # rubocop:disable Style/GuardClause
    @prefixed_factories.remove(k, v)
    v.delete!
  end
end

#shutdownObject

stops. shutdown



103
104
105
# File 'lib/logstash/outputs/blob/file_repository.rb', line 103

def shutdown
  stop_stale_sweeper
end

#sizeObject

gets factory’s size



108
109
110
# File 'lib/logstash/outputs/blob/file_repository.rb', line 108

def size
  @prefixed_factories.size
end

#start_stale_sweeperObject

starts the stale sweeper



121
122
123
124
125
126
127
128
129
# File 'lib/logstash/outputs/blob/file_repository.rb', line 121

def start_stale_sweeper
  @stale_sweeper = Concurrent::TimerTask.new(execution_interval: @sweeper_interval) do
    LogStash::Util.set_thread_name('LogstashAzureBlobOutput, Stale factory sweeper')

    @prefixed_factories.forEach { |k, v| remove_stale(k, v) }
  end

  @stale_sweeper.execute
end

#stop_stale_sweeperObject

stops the stale sweeper



132
133
134
# File 'lib/logstash/outputs/blob/file_repository.rb', line 132

def stop_stale_sweeper
  @stale_sweeper.shutdown
end