Class: LogStash::Outputs::LogstashAzureBlobOutput::FileRepository
- Inherits:
-
Object
- Object
- LogStash::Outputs::LogstashAzureBlobOutput::FileRepository
- Defined in:
- lib/logstash/outputs/blob/file_repository.rb
Overview
sub class for LogstashAzureBlobOutput
this class manages the temporary directory for the temporary files
Defined Under Namespace
Classes: FactoryInitializer, PrefixedValue
Constant Summary collapse
- DEFAULT_STATE_SWEEPER_INTERVAL_SECS =
60
- DEFAULT_STALE_TIME_SECS =
15 * 60
Instance Method Summary collapse
-
#each_files ⇒ Object
with lock for each file.
-
#get_factory(prefix_key) ⇒ Object
Return the file factory.
-
#get_file(prefix_key) ⇒ Object
gets file from prefix_key.
-
#initialize(tags, encoding, temporary_directory, stale_time = DEFAULT_STALE_TIME_SECS, sweeper_interval = DEFAULT_STATE_SWEEPER_INTERVAL_SECS) ⇒ FileRepository
constructor
initializes the class with more variables.
-
#keys ⇒ Object
gets the key set.
-
#remove_stale(k, v) ⇒ Object
remove the stale given key and value.
-
#shutdown ⇒ Object
stops.
-
#size ⇒ Object
gets factory’s size.
-
#start_stale_sweeper ⇒ Object
starts the stale sweeper.
-
#stop_stale_sweeper ⇒ Object
stops the stale sweeper.
Constructor Details
#initialize(tags, encoding, temporary_directory, stale_time = DEFAULT_STALE_TIME_SECS, sweeper_interval = DEFAULT_STATE_SWEEPER_INTERVAL_SECS) ⇒ FileRepository
initializes the class with more variables
66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/logstash/outputs/blob/file_repository.rb', line 66 def initialize(, encoding, temporary_directory, stale_time = DEFAULT_STALE_TIME_SECS, sweeper_interval = DEFAULT_STATE_SWEEPER_INTERVAL_SECS) # The path need to contains the prefix so when we start # logtash after a crash we keep the remote structure @prefixed_factories = ConcurrentHashMap.new @sweeper_interval = sweeper_interval @factory_initializer = FactoryInitializer.new(, encoding, temporary_directory, stale_time) start_stale_sweeper end |
Instance Method Details
#each_files ⇒ Object
with lock for each file
86 87 88 89 90 |
# File 'lib/logstash/outputs/blob/file_repository.rb', line 86 def each_files @prefixed_factories.elements.each do |prefixed_file| prefixed_file.with_lock { |factory| yield factory.current } end end |
#get_factory(prefix_key) ⇒ Object
Return the file factory
93 94 95 |
# File 'lib/logstash/outputs/blob/file_repository.rb', line 93 def get_factory(prefix_key) @prefixed_factories.computeIfAbsent(prefix_key, @factory_initializer).with_lock { |factory| yield factory } end |
#get_file(prefix_key) ⇒ Object
gets file from prefix_key
98 99 100 |
# File 'lib/logstash/outputs/blob/file_repository.rb', line 98 def get_file(prefix_key) get_factory(prefix_key) { |factory| yield factory.current } end |
#keys ⇒ Object
gets the key set
81 82 83 |
# File 'lib/logstash/outputs/blob/file_repository.rb', line 81 def keys @prefixed_factories.keySet end |
#remove_stale(k, v) ⇒ Object
remove the stale given key and value
113 114 115 116 117 118 |
# File 'lib/logstash/outputs/blob/file_repository.rb', line 113 def remove_stale(k, v) if v.stale? # rubocop:disable Style/GuardClause @prefixed_factories.remove(k, v) v.delete! end end |
#shutdown ⇒ Object
stops. shutdown
103 104 105 |
# File 'lib/logstash/outputs/blob/file_repository.rb', line 103 def shutdown stop_stale_sweeper end |
#size ⇒ Object
gets factory’s size
108 109 110 |
# File 'lib/logstash/outputs/blob/file_repository.rb', line 108 def size @prefixed_factories.size end |
#start_stale_sweeper ⇒ Object
starts the stale sweeper
121 122 123 124 125 126 127 128 129 |
# File 'lib/logstash/outputs/blob/file_repository.rb', line 121 def start_stale_sweeper @stale_sweeper = Concurrent::TimerTask.new(execution_interval: @sweeper_interval) do LogStash::Util.set_thread_name('LogstashAzureBlobOutput, Stale factory sweeper') @prefixed_factories.forEach { |k, v| remove_stale(k, v) } end @stale_sweeper.execute end |
#stop_stale_sweeper ⇒ Object
stops the stale sweeper
132 133 134 |
# File 'lib/logstash/outputs/blob/file_repository.rb', line 132 def stop_stale_sweeper @stale_sweeper.shutdown end |