Class: LogStash::Outputs::LogstashAzureBlobOutput
- Inherits:
-
Base
- Object
- Base
- LogStash::Outputs::LogstashAzureBlobOutput
- Defined in:
- lib/logstash/outputs/azure.rb,
lib/logstash/outputs/blob/uploader.rb,
lib/logstash/outputs/blob/path_validator.rb,
lib/logstash/outputs/blob/temporary_file.rb,
lib/logstash/outputs/blob/file_repository.rb,
lib/logstash/outputs/blob/size_rotation_policy.rb,
lib/logstash/outputs/blob/time_rotation_policy.rb,
lib/logstash/outputs/blob/temporary_file_factory.rb,
lib/logstash/outputs/blob/writable_directory_validator.rb,
lib/logstash/outputs/blob/size_and_time_rotation_policy.rb
Overview
Logstash outout plugin that uploads the logs to Azure blobs. The logs are stored on local temporary file which is uploaded as a blob to Azure cloud
Defined Under Namespace
Classes: FileRepository, PathValidator, SizeAndTimeRotationPolicy, SizeRotationPolicy, TemporaryFile, TemporaryFileFactory, TimeRotationPolicy, Uploader, WritableDirectoryValidator
Constant Summary collapse
- PREFIX_KEY_NORMALIZE_CHARACTER =
'_'.freeze
- PERIODIC_CHECK_INTERVAL_IN_SECONDS =
15
- CRASH_RECOVERY_THREADPOOL =
Concurrent::ThreadPoolExecutor.new(min_threads: 1, max_threads: 2, fallback_policy: :caller_runs)
Instance Attribute Summary collapse
-
#container_name ⇒ Object
Returns the value of attribute container_name.
-
#contianer_name ⇒ Object
Blob container to uplaod blobs to (required).
-
#encoding ⇒ Object
the encoding of the files.
-
#prefix ⇒ Object
prefix for the files to be uploaded.
-
#restore ⇒ Object
restore after crash.
-
#rotation_strategy_val ⇒ Object
what will be considered to do the tmeporary file rotation.
-
#size_file ⇒ Object
File size to use for local tmeporary File.
-
#storage_access_key ⇒ Object
Azure storage account access key (required) - found under the access keys tab.
-
#storage_account_name ⇒ Object
Azure storage account name (required) - found under the access keys tab.
-
#tags ⇒ Object
tags for the files.
-
#temporary_directory ⇒ Object
temporary directory where the temporary files will be written.
-
#time_file ⇒ Object
time to upload the local File.
-
#upload queue size(queuesize) ⇒ Object
upload que size.
-
#upload workers count(workerscount) ⇒ Object
how much workers for uplaod.
-
#upload_queue_size ⇒ Object
Returns the value of attribute upload_queue_size.
-
#upload_workers_count ⇒ Object
Returns the value of attribute upload_workers_count.
Instance Method Summary collapse
-
#blob_container_resource ⇒ Object
login to azure cloud using azure gem and create the contianer if it doesn’t exist.
-
#clean_temporary_file(file) ⇒ Object
Cleans the temporary files after it is uploaded to azure blob.
-
#close ⇒ Object
close the temporary file and uploads the content to Azure.
-
#multi_receive_encoded(events_and_encoded) ⇒ Object
Receives multiple events and check if there is space in temporary directory.
-
#normalize_key(prefix_key) ⇒ Object
Validates and normalize prefix key.
-
#register ⇒ Object
initializes the
LogstashAzureBlobOutput
instances validates all config parameters initializes the uploader. -
#restore_from_crash ⇒ Object
uploads files if there was a crash before.
-
#rotate_if_needed(prefixes) ⇒ Object
check if it needs to rotate according to rotation policy and rotates it if it needs.
-
#rotation_strategy ⇒ Object
creates an instance for the rotation strategy.
-
#start_periodic_check ⇒ Object
checks periodically the tmeporary file if it needs to be rotated.
- #stop_periodic_check ⇒ Object
-
#upload_file(temp_file) ⇒ Object
uploads the file using the
Uploader
.
Instance Attribute Details
#container_name ⇒ Object
Returns the value of attribute container_name.
98 99 100 |
# File 'lib/logstash/outputs/azure.rb', line 98 def container_name @container_name end |
#contianer_name ⇒ Object
Blob container to uplaod blobs to (required)
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 |
# File 'lib/logstash/outputs/azure.rb', line 56 class LogStash::Outputs::LogstashAzureBlobOutput < LogStash::Outputs::Base # name for the namespace under output for logstash configuration config_name 'azure' default :codec, "line" require 'logstash/outputs/blob/writable_directory_validator' require 'logstash/outputs/blob/path_validator' require 'logstash/outputs/blob/size_rotation_policy' require 'logstash/outputs/blob/time_rotation_policy' require 'logstash/outputs/blob/size_and_time_rotation_policy' require 'logstash/outputs/blob/temporary_file' require 'logstash/outputs/blob/temporary_file_factory' require 'logstash/outputs/blob/uploader' require 'logstash/outputs/blob/file_repository' PREFIX_KEY_NORMALIZE_CHARACTER = '_'.freeze PERIODIC_CHECK_INTERVAL_IN_SECONDS = 15 CRASH_RECOVERY_THREADPOOL = Concurrent::ThreadPoolExecutor.new(min_threads: 1, max_threads: 2, fallback_policy: :caller_runs) # azure contianer config :storage_account_name, validate: :string, required: false # azure key config :storage_access_key, validate: :string, required: false # conatainer name config :container_name, validate: :string, required: false # mamadas config :size_file, validate: :number, default: 1024 * 1024 * 5 config :time_file, validate: :number, default: 15 config :restore, validate: :boolean, default: true config :temporary_directory, validate: :string, default: File.join(Dir.tmpdir, 'logstash') config :prefix, validate: :string, default: '' config :upload_queue_size, validate: :number, default: 2 * (Concurrent.processor_count * 0.25).ceil config :upload_workers_count, validate: :number, default: (Concurrent.processor_count * 0.5).ceil config :rotation_strategy_val, validate: %w[size_and_time size time], default: 'size_and_time' config :tags, validate: :array, default: [] config :encoding, validate: %w[none gzip], default: 'none' attr_accessor :storage_account_name, :storage_access_key, :container_name, :size_file, :time_file, :restore, :temporary_directory, :prefix, :upload_queue_size, :upload_workers_count, :rotation_strategy_val, :tags, :encoding # initializes the +LogstashAzureBlobOutput+ instances # validates all config parameters # initializes the uploader def register unless @prefix.empty? unless PathValidator.valid?(prefix) raise LogStash::ConfigurationError.new("Prefix must not contains: #{PathValidator::INVALID_CHARACTERS}") end end unless WritableDirectoryValidator.valid?(@temporary_directory) raise LogStash::ConfigurationError.new("Logstash must have the permissions to write to the temporary directory: #{@temporary_directory}") end if @time_file.nil? && @size_file.nil? || @size_file.zero? && @time_file.zero? raise LogStash::ConfigurationError.new('at least one of time_file or size_file set to a value greater than 0') end @file_repository = FileRepository.new(@tags, @encoding, @temporary_directory) @rotation = rotation_strategy executor = Concurrent::ThreadPoolExecutor.new(min_threads: 1, max_threads: @upload_workers_count, max_queue: @upload_queue_size, fallback_policy: :caller_runs) @uploader = Uploader.new(blob_container_resource, container_name, @logger, executor) restore_from_crash if @restore start_periodic_check if @rotation.needs_periodic? end # Receives multiple events and check if there is space in temporary directory # @param events_and_encoded [Object] def multi_receive_encoded(events_and_encoded) prefix_written_to = Set.new events_and_encoded.each do |event, encoded| prefix_key = normalize_key(event.sprintf(@prefix)) prefix_written_to << prefix_key begin @file_repository.get_file(prefix_key) { |file| file.write(encoded) } # The output should stop accepting new events coming in, since it cannot do anything with them anymore. # Log the error and rethrow it. rescue Errno::ENOSPC => e @logger.error('Azure: No space left in temporary directory', temporary_directory: @temporary_directory) raise e end end # Groups IO calls to optimize fstat checks rotate_if_needed(prefix_written_to) end # close the temporary file and uploads the content to Azure def close stop_periodic_check if @rotation.needs_periodic? @logger.debug('Uploading current workspace') # The plugin has stopped receiving new events, but we still have # data on disk, lets make sure it get to Azure blob. # If Logstash get interrupted, the `restore_from_crash` (when set to true) method will pickup # the content in the temporary directly and upload it. # This will block the shutdown until all upload are done or the use force quit. @file_repository.each_files do |file| upload_file(file) end @file_repository.shutdown @uploader.stop # wait until all the current upload are complete @crash_uploader.stop if @restore # we might have still work to do for recovery so wait until we are done end # Validates and normalize prefix key # @param prefix_key [String] def normalize_key(prefix_key) prefix_key.gsub(PathValidator.matches_re, PREFIX_KEY_NORMALIZE_CHARACTER) end # checks periodically the tmeporary file if it needs to be rotated def start_periodic_check @logger.debug('Start periodic rotation check') @periodic_check = Concurrent::TimerTask.new(execution_interval: PERIODIC_CHECK_INTERVAL_IN_SECONDS) do @logger.debug('Periodic check for stale files') rotate_if_needed(@file_repository.keys) end @periodic_check.execute end def stop_periodic_check @periodic_check.shutdown end # login to azure cloud using azure gem and create the contianer if it doesn't exist # @return [Object] the azure_blob_service object, which is the endpoint to azure gem def blob_container_resource Azure.config.storage_account_name = storage_account_name Azure.config.storage_access_key = storage_access_key azure_blob_service = Azure::Blob::BlobService.new list = azure_blob_service.list_containers list.each do |item| @container = item if item.name == container_name end azure_blob_service.create_container(container_name) unless @container azure_blob_service end # check if it needs to rotate according to rotation policy and rotates it if it needs # @param prefixes [String] def rotate_if_needed(prefixes) prefixes.each do |prefix| # Each file access is thread safe, # until the rotation is done then only # one thread has access to the resource. @file_repository.get_factory(prefix) do |factory| temp_file = factory.current if @rotation.rotate?(temp_file) @logger.debug('Rotate file', strategy: @rotation.class.name, key: temp_file.key, path: temp_file.path) upload_file(temp_file) factory.rotate! end end end end # uploads the file using the +Uploader+ def upload_file(temp_file) @logger.debug('Queue for upload', path: temp_file.path) # if the queue is full the calling thread will be used to upload temp_file.close # make sure the content is on disk unless temp_file.empty? # rubocop:disable GuardClause @uploader.upload_async(temp_file, on_complete: method(:clean_temporary_file)) end end # creates an instance for the rotation strategy def rotation_strategy case @rotation_strategy_val when 'size' SizeRotationPolicy.new(size_file) when 'time' TimeRotationPolicy.new(time_file) when 'size_and_time' SizeAndTimeRotationPolicy.new(size_file, time_file) end end # Cleans the temporary files after it is uploaded to azure blob def clean_temporary_file(file) @logger.debug('Removing temporary file', file: file.path) file.delete! end # uploads files if there was a crash before def restore_from_crash @crash_uploader = Uploader.new(blob_container_resource, container_name, @logger, CRASH_RECOVERY_THREADPOOL) temp_folder_path = Pathname.new(@temporary_directory) Dir.glob(::File.join(@temporary_directory, '**/*')) .select { |file| ::File.file?(file) } .each do |file| temp_file = TemporaryFile.create_from_existing_file(file, temp_folder_path) @logger.debug('Recovering from crash and uploading', file: temp_file.path) @crash_uploader.upload_async(temp_file, on_complete: method(:clean_temporary_file)) end end end |
#encoding ⇒ Object
the encoding of the files
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 |
# File 'lib/logstash/outputs/azure.rb', line 56 class LogStash::Outputs::LogstashAzureBlobOutput < LogStash::Outputs::Base # name for the namespace under output for logstash configuration config_name 'azure' default :codec, "line" require 'logstash/outputs/blob/writable_directory_validator' require 'logstash/outputs/blob/path_validator' require 'logstash/outputs/blob/size_rotation_policy' require 'logstash/outputs/blob/time_rotation_policy' require 'logstash/outputs/blob/size_and_time_rotation_policy' require 'logstash/outputs/blob/temporary_file' require 'logstash/outputs/blob/temporary_file_factory' require 'logstash/outputs/blob/uploader' require 'logstash/outputs/blob/file_repository' PREFIX_KEY_NORMALIZE_CHARACTER = '_'.freeze PERIODIC_CHECK_INTERVAL_IN_SECONDS = 15 CRASH_RECOVERY_THREADPOOL = Concurrent::ThreadPoolExecutor.new(min_threads: 1, max_threads: 2, fallback_policy: :caller_runs) # azure contianer config :storage_account_name, validate: :string, required: false # azure key config :storage_access_key, validate: :string, required: false # conatainer name config :container_name, validate: :string, required: false # mamadas config :size_file, validate: :number, default: 1024 * 1024 * 5 config :time_file, validate: :number, default: 15 config :restore, validate: :boolean, default: true config :temporary_directory, validate: :string, default: File.join(Dir.tmpdir, 'logstash') config :prefix, validate: :string, default: '' config :upload_queue_size, validate: :number, default: 2 * (Concurrent.processor_count * 0.25).ceil config :upload_workers_count, validate: :number, default: (Concurrent.processor_count * 0.5).ceil config :rotation_strategy_val, validate: %w[size_and_time size time], default: 'size_and_time' config :tags, validate: :array, default: [] config :encoding, validate: %w[none gzip], default: 'none' attr_accessor :storage_account_name, :storage_access_key, :container_name, :size_file, :time_file, :restore, :temporary_directory, :prefix, :upload_queue_size, :upload_workers_count, :rotation_strategy_val, :tags, :encoding # initializes the +LogstashAzureBlobOutput+ instances # validates all config parameters # initializes the uploader def register unless @prefix.empty? unless PathValidator.valid?(prefix) raise LogStash::ConfigurationError.new("Prefix must not contains: #{PathValidator::INVALID_CHARACTERS}") end end unless WritableDirectoryValidator.valid?(@temporary_directory) raise LogStash::ConfigurationError.new("Logstash must have the permissions to write to the temporary directory: #{@temporary_directory}") end if @time_file.nil? && @size_file.nil? || @size_file.zero? && @time_file.zero? raise LogStash::ConfigurationError.new('at least one of time_file or size_file set to a value greater than 0') end @file_repository = FileRepository.new(@tags, @encoding, @temporary_directory) @rotation = rotation_strategy executor = Concurrent::ThreadPoolExecutor.new(min_threads: 1, max_threads: @upload_workers_count, max_queue: @upload_queue_size, fallback_policy: :caller_runs) @uploader = Uploader.new(blob_container_resource, container_name, @logger, executor) restore_from_crash if @restore start_periodic_check if @rotation.needs_periodic? end # Receives multiple events and check if there is space in temporary directory # @param events_and_encoded [Object] def multi_receive_encoded(events_and_encoded) prefix_written_to = Set.new events_and_encoded.each do |event, encoded| prefix_key = normalize_key(event.sprintf(@prefix)) prefix_written_to << prefix_key begin @file_repository.get_file(prefix_key) { |file| file.write(encoded) } # The output should stop accepting new events coming in, since it cannot do anything with them anymore. # Log the error and rethrow it. rescue Errno::ENOSPC => e @logger.error('Azure: No space left in temporary directory', temporary_directory: @temporary_directory) raise e end end # Groups IO calls to optimize fstat checks rotate_if_needed(prefix_written_to) end # close the temporary file and uploads the content to Azure def close stop_periodic_check if @rotation.needs_periodic? @logger.debug('Uploading current workspace') # The plugin has stopped receiving new events, but we still have # data on disk, lets make sure it get to Azure blob. # If Logstash get interrupted, the `restore_from_crash` (when set to true) method will pickup # the content in the temporary directly and upload it. # This will block the shutdown until all upload are done or the use force quit. @file_repository.each_files do |file| upload_file(file) end @file_repository.shutdown @uploader.stop # wait until all the current upload are complete @crash_uploader.stop if @restore # we might have still work to do for recovery so wait until we are done end # Validates and normalize prefix key # @param prefix_key [String] def normalize_key(prefix_key) prefix_key.gsub(PathValidator.matches_re, PREFIX_KEY_NORMALIZE_CHARACTER) end # checks periodically the tmeporary file if it needs to be rotated def start_periodic_check @logger.debug('Start periodic rotation check') @periodic_check = Concurrent::TimerTask.new(execution_interval: PERIODIC_CHECK_INTERVAL_IN_SECONDS) do @logger.debug('Periodic check for stale files') rotate_if_needed(@file_repository.keys) end @periodic_check.execute end def stop_periodic_check @periodic_check.shutdown end # login to azure cloud using azure gem and create the contianer if it doesn't exist # @return [Object] the azure_blob_service object, which is the endpoint to azure gem def blob_container_resource Azure.config.storage_account_name = storage_account_name Azure.config.storage_access_key = storage_access_key azure_blob_service = Azure::Blob::BlobService.new list = azure_blob_service.list_containers list.each do |item| @container = item if item.name == container_name end azure_blob_service.create_container(container_name) unless @container azure_blob_service end # check if it needs to rotate according to rotation policy and rotates it if it needs # @param prefixes [String] def rotate_if_needed(prefixes) prefixes.each do |prefix| # Each file access is thread safe, # until the rotation is done then only # one thread has access to the resource. @file_repository.get_factory(prefix) do |factory| temp_file = factory.current if @rotation.rotate?(temp_file) @logger.debug('Rotate file', strategy: @rotation.class.name, key: temp_file.key, path: temp_file.path) upload_file(temp_file) factory.rotate! end end end end # uploads the file using the +Uploader+ def upload_file(temp_file) @logger.debug('Queue for upload', path: temp_file.path) # if the queue is full the calling thread will be used to upload temp_file.close # make sure the content is on disk unless temp_file.empty? # rubocop:disable GuardClause @uploader.upload_async(temp_file, on_complete: method(:clean_temporary_file)) end end # creates an instance for the rotation strategy def rotation_strategy case @rotation_strategy_val when 'size' SizeRotationPolicy.new(size_file) when 'time' TimeRotationPolicy.new(time_file) when 'size_and_time' SizeAndTimeRotationPolicy.new(size_file, time_file) end end # Cleans the temporary files after it is uploaded to azure blob def clean_temporary_file(file) @logger.debug('Removing temporary file', file: file.path) file.delete! end # uploads files if there was a crash before def restore_from_crash @crash_uploader = Uploader.new(blob_container_resource, container_name, @logger, CRASH_RECOVERY_THREADPOOL) temp_folder_path = Pathname.new(@temporary_directory) Dir.glob(::File.join(@temporary_directory, '**/*')) .select { |file| ::File.file?(file) } .each do |file| temp_file = TemporaryFile.create_from_existing_file(file, temp_folder_path) @logger.debug('Recovering from crash and uploading', file: temp_file.path) @crash_uploader.upload_async(temp_file, on_complete: method(:clean_temporary_file)) end end end |
#prefix ⇒ Object
prefix for the files to be uploaded
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 |
# File 'lib/logstash/outputs/azure.rb', line 56 class LogStash::Outputs::LogstashAzureBlobOutput < LogStash::Outputs::Base # name for the namespace under output for logstash configuration config_name 'azure' default :codec, "line" require 'logstash/outputs/blob/writable_directory_validator' require 'logstash/outputs/blob/path_validator' require 'logstash/outputs/blob/size_rotation_policy' require 'logstash/outputs/blob/time_rotation_policy' require 'logstash/outputs/blob/size_and_time_rotation_policy' require 'logstash/outputs/blob/temporary_file' require 'logstash/outputs/blob/temporary_file_factory' require 'logstash/outputs/blob/uploader' require 'logstash/outputs/blob/file_repository' PREFIX_KEY_NORMALIZE_CHARACTER = '_'.freeze PERIODIC_CHECK_INTERVAL_IN_SECONDS = 15 CRASH_RECOVERY_THREADPOOL = Concurrent::ThreadPoolExecutor.new(min_threads: 1, max_threads: 2, fallback_policy: :caller_runs) # azure contianer config :storage_account_name, validate: :string, required: false # azure key config :storage_access_key, validate: :string, required: false # conatainer name config :container_name, validate: :string, required: false # mamadas config :size_file, validate: :number, default: 1024 * 1024 * 5 config :time_file, validate: :number, default: 15 config :restore, validate: :boolean, default: true config :temporary_directory, validate: :string, default: File.join(Dir.tmpdir, 'logstash') config :prefix, validate: :string, default: '' config :upload_queue_size, validate: :number, default: 2 * (Concurrent.processor_count * 0.25).ceil config :upload_workers_count, validate: :number, default: (Concurrent.processor_count * 0.5).ceil config :rotation_strategy_val, validate: %w[size_and_time size time], default: 'size_and_time' config :tags, validate: :array, default: [] config :encoding, validate: %w[none gzip], default: 'none' attr_accessor :storage_account_name, :storage_access_key, :container_name, :size_file, :time_file, :restore, :temporary_directory, :prefix, :upload_queue_size, :upload_workers_count, :rotation_strategy_val, :tags, :encoding # initializes the +LogstashAzureBlobOutput+ instances # validates all config parameters # initializes the uploader def register unless @prefix.empty? unless PathValidator.valid?(prefix) raise LogStash::ConfigurationError.new("Prefix must not contains: #{PathValidator::INVALID_CHARACTERS}") end end unless WritableDirectoryValidator.valid?(@temporary_directory) raise LogStash::ConfigurationError.new("Logstash must have the permissions to write to the temporary directory: #{@temporary_directory}") end if @time_file.nil? && @size_file.nil? || @size_file.zero? && @time_file.zero? raise LogStash::ConfigurationError.new('at least one of time_file or size_file set to a value greater than 0') end @file_repository = FileRepository.new(@tags, @encoding, @temporary_directory) @rotation = rotation_strategy executor = Concurrent::ThreadPoolExecutor.new(min_threads: 1, max_threads: @upload_workers_count, max_queue: @upload_queue_size, fallback_policy: :caller_runs) @uploader = Uploader.new(blob_container_resource, container_name, @logger, executor) restore_from_crash if @restore start_periodic_check if @rotation.needs_periodic? end # Receives multiple events and check if there is space in temporary directory # @param events_and_encoded [Object] def multi_receive_encoded(events_and_encoded) prefix_written_to = Set.new events_and_encoded.each do |event, encoded| prefix_key = normalize_key(event.sprintf(@prefix)) prefix_written_to << prefix_key begin @file_repository.get_file(prefix_key) { |file| file.write(encoded) } # The output should stop accepting new events coming in, since it cannot do anything with them anymore. # Log the error and rethrow it. rescue Errno::ENOSPC => e @logger.error('Azure: No space left in temporary directory', temporary_directory: @temporary_directory) raise e end end # Groups IO calls to optimize fstat checks rotate_if_needed(prefix_written_to) end # close the temporary file and uploads the content to Azure def close stop_periodic_check if @rotation.needs_periodic? @logger.debug('Uploading current workspace') # The plugin has stopped receiving new events, but we still have # data on disk, lets make sure it get to Azure blob. # If Logstash get interrupted, the `restore_from_crash` (when set to true) method will pickup # the content in the temporary directly and upload it. # This will block the shutdown until all upload are done or the use force quit. @file_repository.each_files do |file| upload_file(file) end @file_repository.shutdown @uploader.stop # wait until all the current upload are complete @crash_uploader.stop if @restore # we might have still work to do for recovery so wait until we are done end # Validates and normalize prefix key # @param prefix_key [String] def normalize_key(prefix_key) prefix_key.gsub(PathValidator.matches_re, PREFIX_KEY_NORMALIZE_CHARACTER) end # checks periodically the tmeporary file if it needs to be rotated def start_periodic_check @logger.debug('Start periodic rotation check') @periodic_check = Concurrent::TimerTask.new(execution_interval: PERIODIC_CHECK_INTERVAL_IN_SECONDS) do @logger.debug('Periodic check for stale files') rotate_if_needed(@file_repository.keys) end @periodic_check.execute end def stop_periodic_check @periodic_check.shutdown end # login to azure cloud using azure gem and create the contianer if it doesn't exist # @return [Object] the azure_blob_service object, which is the endpoint to azure gem def blob_container_resource Azure.config.storage_account_name = storage_account_name Azure.config.storage_access_key = storage_access_key azure_blob_service = Azure::Blob::BlobService.new list = azure_blob_service.list_containers list.each do |item| @container = item if item.name == container_name end azure_blob_service.create_container(container_name) unless @container azure_blob_service end # check if it needs to rotate according to rotation policy and rotates it if it needs # @param prefixes [String] def rotate_if_needed(prefixes) prefixes.each do |prefix| # Each file access is thread safe, # until the rotation is done then only # one thread has access to the resource. @file_repository.get_factory(prefix) do |factory| temp_file = factory.current if @rotation.rotate?(temp_file) @logger.debug('Rotate file', strategy: @rotation.class.name, key: temp_file.key, path: temp_file.path) upload_file(temp_file) factory.rotate! end end end end # uploads the file using the +Uploader+ def upload_file(temp_file) @logger.debug('Queue for upload', path: temp_file.path) # if the queue is full the calling thread will be used to upload temp_file.close # make sure the content is on disk unless temp_file.empty? # rubocop:disable GuardClause @uploader.upload_async(temp_file, on_complete: method(:clean_temporary_file)) end end # creates an instance for the rotation strategy def rotation_strategy case @rotation_strategy_val when 'size' SizeRotationPolicy.new(size_file) when 'time' TimeRotationPolicy.new(time_file) when 'size_and_time' SizeAndTimeRotationPolicy.new(size_file, time_file) end end # Cleans the temporary files after it is uploaded to azure blob def clean_temporary_file(file) @logger.debug('Removing temporary file', file: file.path) file.delete! end # uploads files if there was a crash before def restore_from_crash @crash_uploader = Uploader.new(blob_container_resource, container_name, @logger, CRASH_RECOVERY_THREADPOOL) temp_folder_path = Pathname.new(@temporary_directory) Dir.glob(::File.join(@temporary_directory, '**/*')) .select { |file| ::File.file?(file) } .each do |file| temp_file = TemporaryFile.create_from_existing_file(file, temp_folder_path) @logger.debug('Recovering from crash and uploading', file: temp_file.path) @crash_uploader.upload_async(temp_file, on_complete: method(:clean_temporary_file)) end end end |
#restore ⇒ Object
restore after crash
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 |
# File 'lib/logstash/outputs/azure.rb', line 56 class LogStash::Outputs::LogstashAzureBlobOutput < LogStash::Outputs::Base # name for the namespace under output for logstash configuration config_name 'azure' default :codec, "line" require 'logstash/outputs/blob/writable_directory_validator' require 'logstash/outputs/blob/path_validator' require 'logstash/outputs/blob/size_rotation_policy' require 'logstash/outputs/blob/time_rotation_policy' require 'logstash/outputs/blob/size_and_time_rotation_policy' require 'logstash/outputs/blob/temporary_file' require 'logstash/outputs/blob/temporary_file_factory' require 'logstash/outputs/blob/uploader' require 'logstash/outputs/blob/file_repository' PREFIX_KEY_NORMALIZE_CHARACTER = '_'.freeze PERIODIC_CHECK_INTERVAL_IN_SECONDS = 15 CRASH_RECOVERY_THREADPOOL = Concurrent::ThreadPoolExecutor.new(min_threads: 1, max_threads: 2, fallback_policy: :caller_runs) # azure contianer config :storage_account_name, validate: :string, required: false # azure key config :storage_access_key, validate: :string, required: false # conatainer name config :container_name, validate: :string, required: false # mamadas config :size_file, validate: :number, default: 1024 * 1024 * 5 config :time_file, validate: :number, default: 15 config :restore, validate: :boolean, default: true config :temporary_directory, validate: :string, default: File.join(Dir.tmpdir, 'logstash') config :prefix, validate: :string, default: '' config :upload_queue_size, validate: :number, default: 2 * (Concurrent.processor_count * 0.25).ceil config :upload_workers_count, validate: :number, default: (Concurrent.processor_count * 0.5).ceil config :rotation_strategy_val, validate: %w[size_and_time size time], default: 'size_and_time' config :tags, validate: :array, default: [] config :encoding, validate: %w[none gzip], default: 'none' attr_accessor :storage_account_name, :storage_access_key, :container_name, :size_file, :time_file, :restore, :temporary_directory, :prefix, :upload_queue_size, :upload_workers_count, :rotation_strategy_val, :tags, :encoding # initializes the +LogstashAzureBlobOutput+ instances # validates all config parameters # initializes the uploader def register unless @prefix.empty? unless PathValidator.valid?(prefix) raise LogStash::ConfigurationError.new("Prefix must not contains: #{PathValidator::INVALID_CHARACTERS}") end end unless WritableDirectoryValidator.valid?(@temporary_directory) raise LogStash::ConfigurationError.new("Logstash must have the permissions to write to the temporary directory: #{@temporary_directory}") end if @time_file.nil? && @size_file.nil? || @size_file.zero? && @time_file.zero? raise LogStash::ConfigurationError.new('at least one of time_file or size_file set to a value greater than 0') end @file_repository = FileRepository.new(@tags, @encoding, @temporary_directory) @rotation = rotation_strategy executor = Concurrent::ThreadPoolExecutor.new(min_threads: 1, max_threads: @upload_workers_count, max_queue: @upload_queue_size, fallback_policy: :caller_runs) @uploader = Uploader.new(blob_container_resource, container_name, @logger, executor) restore_from_crash if @restore start_periodic_check if @rotation.needs_periodic? end # Receives multiple events and check if there is space in temporary directory # @param events_and_encoded [Object] def multi_receive_encoded(events_and_encoded) prefix_written_to = Set.new events_and_encoded.each do |event, encoded| prefix_key = normalize_key(event.sprintf(@prefix)) prefix_written_to << prefix_key begin @file_repository.get_file(prefix_key) { |file| file.write(encoded) } # The output should stop accepting new events coming in, since it cannot do anything with them anymore. # Log the error and rethrow it. rescue Errno::ENOSPC => e @logger.error('Azure: No space left in temporary directory', temporary_directory: @temporary_directory) raise e end end # Groups IO calls to optimize fstat checks rotate_if_needed(prefix_written_to) end # close the temporary file and uploads the content to Azure def close stop_periodic_check if @rotation.needs_periodic? @logger.debug('Uploading current workspace') # The plugin has stopped receiving new events, but we still have # data on disk, lets make sure it get to Azure blob. # If Logstash get interrupted, the `restore_from_crash` (when set to true) method will pickup # the content in the temporary directly and upload it. # This will block the shutdown until all upload are done or the use force quit. @file_repository.each_files do |file| upload_file(file) end @file_repository.shutdown @uploader.stop # wait until all the current upload are complete @crash_uploader.stop if @restore # we might have still work to do for recovery so wait until we are done end # Validates and normalize prefix key # @param prefix_key [String] def normalize_key(prefix_key) prefix_key.gsub(PathValidator.matches_re, PREFIX_KEY_NORMALIZE_CHARACTER) end # checks periodically the tmeporary file if it needs to be rotated def start_periodic_check @logger.debug('Start periodic rotation check') @periodic_check = Concurrent::TimerTask.new(execution_interval: PERIODIC_CHECK_INTERVAL_IN_SECONDS) do @logger.debug('Periodic check for stale files') rotate_if_needed(@file_repository.keys) end @periodic_check.execute end def stop_periodic_check @periodic_check.shutdown end # login to azure cloud using azure gem and create the contianer if it doesn't exist # @return [Object] the azure_blob_service object, which is the endpoint to azure gem def blob_container_resource Azure.config.storage_account_name = storage_account_name Azure.config.storage_access_key = storage_access_key azure_blob_service = Azure::Blob::BlobService.new list = azure_blob_service.list_containers list.each do |item| @container = item if item.name == container_name end azure_blob_service.create_container(container_name) unless @container azure_blob_service end # check if it needs to rotate according to rotation policy and rotates it if it needs # @param prefixes [String] def rotate_if_needed(prefixes) prefixes.each do |prefix| # Each file access is thread safe, # until the rotation is done then only # one thread has access to the resource. @file_repository.get_factory(prefix) do |factory| temp_file = factory.current if @rotation.rotate?(temp_file) @logger.debug('Rotate file', strategy: @rotation.class.name, key: temp_file.key, path: temp_file.path) upload_file(temp_file) factory.rotate! end end end end # uploads the file using the +Uploader+ def upload_file(temp_file) @logger.debug('Queue for upload', path: temp_file.path) # if the queue is full the calling thread will be used to upload temp_file.close # make sure the content is on disk unless temp_file.empty? # rubocop:disable GuardClause @uploader.upload_async(temp_file, on_complete: method(:clean_temporary_file)) end end # creates an instance for the rotation strategy def rotation_strategy case @rotation_strategy_val when 'size' SizeRotationPolicy.new(size_file) when 'time' TimeRotationPolicy.new(time_file) when 'size_and_time' SizeAndTimeRotationPolicy.new(size_file, time_file) end end # Cleans the temporary files after it is uploaded to azure blob def clean_temporary_file(file) @logger.debug('Removing temporary file', file: file.path) file.delete! end # uploads files if there was a crash before def restore_from_crash @crash_uploader = Uploader.new(blob_container_resource, container_name, @logger, CRASH_RECOVERY_THREADPOOL) temp_folder_path = Pathname.new(@temporary_directory) Dir.glob(::File.join(@temporary_directory, '**/*')) .select { |file| ::File.file?(file) } .each do |file| temp_file = TemporaryFile.create_from_existing_file(file, temp_folder_path) @logger.debug('Recovering from crash and uploading', file: temp_file.path) @crash_uploader.upload_async(temp_file, on_complete: method(:clean_temporary_file)) end end end |
#rotation_strategy_val ⇒ Object
what will be considered to do the tmeporary file rotation
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 |
# File 'lib/logstash/outputs/azure.rb', line 56 class LogStash::Outputs::LogstashAzureBlobOutput < LogStash::Outputs::Base # name for the namespace under output for logstash configuration config_name 'azure' default :codec, "line" require 'logstash/outputs/blob/writable_directory_validator' require 'logstash/outputs/blob/path_validator' require 'logstash/outputs/blob/size_rotation_policy' require 'logstash/outputs/blob/time_rotation_policy' require 'logstash/outputs/blob/size_and_time_rotation_policy' require 'logstash/outputs/blob/temporary_file' require 'logstash/outputs/blob/temporary_file_factory' require 'logstash/outputs/blob/uploader' require 'logstash/outputs/blob/file_repository' PREFIX_KEY_NORMALIZE_CHARACTER = '_'.freeze PERIODIC_CHECK_INTERVAL_IN_SECONDS = 15 CRASH_RECOVERY_THREADPOOL = Concurrent::ThreadPoolExecutor.new(min_threads: 1, max_threads: 2, fallback_policy: :caller_runs) # azure contianer config :storage_account_name, validate: :string, required: false # azure key config :storage_access_key, validate: :string, required: false # conatainer name config :container_name, validate: :string, required: false # mamadas config :size_file, validate: :number, default: 1024 * 1024 * 5 config :time_file, validate: :number, default: 15 config :restore, validate: :boolean, default: true config :temporary_directory, validate: :string, default: File.join(Dir.tmpdir, 'logstash') config :prefix, validate: :string, default: '' config :upload_queue_size, validate: :number, default: 2 * (Concurrent.processor_count * 0.25).ceil config :upload_workers_count, validate: :number, default: (Concurrent.processor_count * 0.5).ceil config :rotation_strategy_val, validate: %w[size_and_time size time], default: 'size_and_time' config :tags, validate: :array, default: [] config :encoding, validate: %w[none gzip], default: 'none' attr_accessor :storage_account_name, :storage_access_key, :container_name, :size_file, :time_file, :restore, :temporary_directory, :prefix, :upload_queue_size, :upload_workers_count, :rotation_strategy_val, :tags, :encoding # initializes the +LogstashAzureBlobOutput+ instances # validates all config parameters # initializes the uploader def register unless @prefix.empty? unless PathValidator.valid?(prefix) raise LogStash::ConfigurationError.new("Prefix must not contains: #{PathValidator::INVALID_CHARACTERS}") end end unless WritableDirectoryValidator.valid?(@temporary_directory) raise LogStash::ConfigurationError.new("Logstash must have the permissions to write to the temporary directory: #{@temporary_directory}") end if @time_file.nil? && @size_file.nil? || @size_file.zero? && @time_file.zero? raise LogStash::ConfigurationError.new('at least one of time_file or size_file set to a value greater than 0') end @file_repository = FileRepository.new(@tags, @encoding, @temporary_directory) @rotation = rotation_strategy executor = Concurrent::ThreadPoolExecutor.new(min_threads: 1, max_threads: @upload_workers_count, max_queue: @upload_queue_size, fallback_policy: :caller_runs) @uploader = Uploader.new(blob_container_resource, container_name, @logger, executor) restore_from_crash if @restore start_periodic_check if @rotation.needs_periodic? end # Receives multiple events and check if there is space in temporary directory # @param events_and_encoded [Object] def multi_receive_encoded(events_and_encoded) prefix_written_to = Set.new events_and_encoded.each do |event, encoded| prefix_key = normalize_key(event.sprintf(@prefix)) prefix_written_to << prefix_key begin @file_repository.get_file(prefix_key) { |file| file.write(encoded) } # The output should stop accepting new events coming in, since it cannot do anything with them anymore. # Log the error and rethrow it. rescue Errno::ENOSPC => e @logger.error('Azure: No space left in temporary directory', temporary_directory: @temporary_directory) raise e end end # Groups IO calls to optimize fstat checks rotate_if_needed(prefix_written_to) end # close the temporary file and uploads the content to Azure def close stop_periodic_check if @rotation.needs_periodic? @logger.debug('Uploading current workspace') # The plugin has stopped receiving new events, but we still have # data on disk, lets make sure it get to Azure blob. # If Logstash get interrupted, the `restore_from_crash` (when set to true) method will pickup # the content in the temporary directly and upload it. # This will block the shutdown until all upload are done or the use force quit. @file_repository.each_files do |file| upload_file(file) end @file_repository.shutdown @uploader.stop # wait until all the current upload are complete @crash_uploader.stop if @restore # we might have still work to do for recovery so wait until we are done end # Validates and normalize prefix key # @param prefix_key [String] def normalize_key(prefix_key) prefix_key.gsub(PathValidator.matches_re, PREFIX_KEY_NORMALIZE_CHARACTER) end # checks periodically the tmeporary file if it needs to be rotated def start_periodic_check @logger.debug('Start periodic rotation check') @periodic_check = Concurrent::TimerTask.new(execution_interval: PERIODIC_CHECK_INTERVAL_IN_SECONDS) do @logger.debug('Periodic check for stale files') rotate_if_needed(@file_repository.keys) end @periodic_check.execute end def stop_periodic_check @periodic_check.shutdown end # login to azure cloud using azure gem and create the contianer if it doesn't exist # @return [Object] the azure_blob_service object, which is the endpoint to azure gem def blob_container_resource Azure.config.storage_account_name = storage_account_name Azure.config.storage_access_key = storage_access_key azure_blob_service = Azure::Blob::BlobService.new list = azure_blob_service.list_containers list.each do |item| @container = item if item.name == container_name end azure_blob_service.create_container(container_name) unless @container azure_blob_service end # check if it needs to rotate according to rotation policy and rotates it if it needs # @param prefixes [String] def rotate_if_needed(prefixes) prefixes.each do |prefix| # Each file access is thread safe, # until the rotation is done then only # one thread has access to the resource. @file_repository.get_factory(prefix) do |factory| temp_file = factory.current if @rotation.rotate?(temp_file) @logger.debug('Rotate file', strategy: @rotation.class.name, key: temp_file.key, path: temp_file.path) upload_file(temp_file) factory.rotate! end end end end # uploads the file using the +Uploader+ def upload_file(temp_file) @logger.debug('Queue for upload', path: temp_file.path) # if the queue is full the calling thread will be used to upload temp_file.close # make sure the content is on disk unless temp_file.empty? # rubocop:disable GuardClause @uploader.upload_async(temp_file, on_complete: method(:clean_temporary_file)) end end # creates an instance for the rotation strategy def rotation_strategy case @rotation_strategy_val when 'size' SizeRotationPolicy.new(size_file) when 'time' TimeRotationPolicy.new(time_file) when 'size_and_time' SizeAndTimeRotationPolicy.new(size_file, time_file) end end # Cleans the temporary files after it is uploaded to azure blob def clean_temporary_file(file) @logger.debug('Removing temporary file', file: file.path) file.delete! end # uploads files if there was a crash before def restore_from_crash @crash_uploader = Uploader.new(blob_container_resource, container_name, @logger, CRASH_RECOVERY_THREADPOOL) temp_folder_path = Pathname.new(@temporary_directory) Dir.glob(::File.join(@temporary_directory, '**/*')) .select { |file| ::File.file?(file) } .each do |file| temp_file = TemporaryFile.create_from_existing_file(file, temp_folder_path) @logger.debug('Recovering from crash and uploading', file: temp_file.path) @crash_uploader.upload_async(temp_file, on_complete: method(:clean_temporary_file)) end end end |
#size_file ⇒ Object
File size to use for local tmeporary File
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 |
# File 'lib/logstash/outputs/azure.rb', line 56 class LogStash::Outputs::LogstashAzureBlobOutput < LogStash::Outputs::Base # name for the namespace under output for logstash configuration config_name 'azure' default :codec, "line" require 'logstash/outputs/blob/writable_directory_validator' require 'logstash/outputs/blob/path_validator' require 'logstash/outputs/blob/size_rotation_policy' require 'logstash/outputs/blob/time_rotation_policy' require 'logstash/outputs/blob/size_and_time_rotation_policy' require 'logstash/outputs/blob/temporary_file' require 'logstash/outputs/blob/temporary_file_factory' require 'logstash/outputs/blob/uploader' require 'logstash/outputs/blob/file_repository' PREFIX_KEY_NORMALIZE_CHARACTER = '_'.freeze PERIODIC_CHECK_INTERVAL_IN_SECONDS = 15 CRASH_RECOVERY_THREADPOOL = Concurrent::ThreadPoolExecutor.new(min_threads: 1, max_threads: 2, fallback_policy: :caller_runs) # azure contianer config :storage_account_name, validate: :string, required: false # azure key config :storage_access_key, validate: :string, required: false # conatainer name config :container_name, validate: :string, required: false # mamadas config :size_file, validate: :number, default: 1024 * 1024 * 5 config :time_file, validate: :number, default: 15 config :restore, validate: :boolean, default: true config :temporary_directory, validate: :string, default: File.join(Dir.tmpdir, 'logstash') config :prefix, validate: :string, default: '' config :upload_queue_size, validate: :number, default: 2 * (Concurrent.processor_count * 0.25).ceil config :upload_workers_count, validate: :number, default: (Concurrent.processor_count * 0.5).ceil config :rotation_strategy_val, validate: %w[size_and_time size time], default: 'size_and_time' config :tags, validate: :array, default: [] config :encoding, validate: %w[none gzip], default: 'none' attr_accessor :storage_account_name, :storage_access_key, :container_name, :size_file, :time_file, :restore, :temporary_directory, :prefix, :upload_queue_size, :upload_workers_count, :rotation_strategy_val, :tags, :encoding # initializes the +LogstashAzureBlobOutput+ instances # validates all config parameters # initializes the uploader def register unless @prefix.empty? unless PathValidator.valid?(prefix) raise LogStash::ConfigurationError.new("Prefix must not contains: #{PathValidator::INVALID_CHARACTERS}") end end unless WritableDirectoryValidator.valid?(@temporary_directory) raise LogStash::ConfigurationError.new("Logstash must have the permissions to write to the temporary directory: #{@temporary_directory}") end if @time_file.nil? && @size_file.nil? || @size_file.zero? && @time_file.zero? raise LogStash::ConfigurationError.new('at least one of time_file or size_file set to a value greater than 0') end @file_repository = FileRepository.new(@tags, @encoding, @temporary_directory) @rotation = rotation_strategy executor = Concurrent::ThreadPoolExecutor.new(min_threads: 1, max_threads: @upload_workers_count, max_queue: @upload_queue_size, fallback_policy: :caller_runs) @uploader = Uploader.new(blob_container_resource, container_name, @logger, executor) restore_from_crash if @restore start_periodic_check if @rotation.needs_periodic? end # Receives multiple events and check if there is space in temporary directory # @param events_and_encoded [Object] def multi_receive_encoded(events_and_encoded) prefix_written_to = Set.new events_and_encoded.each do |event, encoded| prefix_key = normalize_key(event.sprintf(@prefix)) prefix_written_to << prefix_key begin @file_repository.get_file(prefix_key) { |file| file.write(encoded) } # The output should stop accepting new events coming in, since it cannot do anything with them anymore. # Log the error and rethrow it. rescue Errno::ENOSPC => e @logger.error('Azure: No space left in temporary directory', temporary_directory: @temporary_directory) raise e end end # Groups IO calls to optimize fstat checks rotate_if_needed(prefix_written_to) end # close the temporary file and uploads the content to Azure def close stop_periodic_check if @rotation.needs_periodic? @logger.debug('Uploading current workspace') # The plugin has stopped receiving new events, but we still have # data on disk, lets make sure it get to Azure blob. # If Logstash get interrupted, the `restore_from_crash` (when set to true) method will pickup # the content in the temporary directly and upload it. # This will block the shutdown until all upload are done or the use force quit. @file_repository.each_files do |file| upload_file(file) end @file_repository.shutdown @uploader.stop # wait until all the current upload are complete @crash_uploader.stop if @restore # we might have still work to do for recovery so wait until we are done end # Validates and normalize prefix key # @param prefix_key [String] def normalize_key(prefix_key) prefix_key.gsub(PathValidator.matches_re, PREFIX_KEY_NORMALIZE_CHARACTER) end # checks periodically the tmeporary file if it needs to be rotated def start_periodic_check @logger.debug('Start periodic rotation check') @periodic_check = Concurrent::TimerTask.new(execution_interval: PERIODIC_CHECK_INTERVAL_IN_SECONDS) do @logger.debug('Periodic check for stale files') rotate_if_needed(@file_repository.keys) end @periodic_check.execute end def stop_periodic_check @periodic_check.shutdown end # login to azure cloud using azure gem and create the contianer if it doesn't exist # @return [Object] the azure_blob_service object, which is the endpoint to azure gem def blob_container_resource Azure.config.storage_account_name = storage_account_name Azure.config.storage_access_key = storage_access_key azure_blob_service = Azure::Blob::BlobService.new list = azure_blob_service.list_containers list.each do |item| @container = item if item.name == container_name end azure_blob_service.create_container(container_name) unless @container azure_blob_service end # check if it needs to rotate according to rotation policy and rotates it if it needs # @param prefixes [String] def rotate_if_needed(prefixes) prefixes.each do |prefix| # Each file access is thread safe, # until the rotation is done then only # one thread has access to the resource. @file_repository.get_factory(prefix) do |factory| temp_file = factory.current if @rotation.rotate?(temp_file) @logger.debug('Rotate file', strategy: @rotation.class.name, key: temp_file.key, path: temp_file.path) upload_file(temp_file) factory.rotate! end end end end # uploads the file using the +Uploader+ def upload_file(temp_file) @logger.debug('Queue for upload', path: temp_file.path) # if the queue is full the calling thread will be used to upload temp_file.close # make sure the content is on disk unless temp_file.empty? # rubocop:disable GuardClause @uploader.upload_async(temp_file, on_complete: method(:clean_temporary_file)) end end # creates an instance for the rotation strategy def rotation_strategy case @rotation_strategy_val when 'size' SizeRotationPolicy.new(size_file) when 'time' TimeRotationPolicy.new(time_file) when 'size_and_time' SizeAndTimeRotationPolicy.new(size_file, time_file) end end # Cleans the temporary files after it is uploaded to azure blob def clean_temporary_file(file) @logger.debug('Removing temporary file', file: file.path) file.delete! end # uploads files if there was a crash before def restore_from_crash @crash_uploader = Uploader.new(blob_container_resource, container_name, @logger, CRASH_RECOVERY_THREADPOOL) temp_folder_path = Pathname.new(@temporary_directory) Dir.glob(::File.join(@temporary_directory, '**/*')) .select { |file| ::File.file?(file) } .each do |file| temp_file = TemporaryFile.create_from_existing_file(file, temp_folder_path) @logger.debug('Recovering from crash and uploading', file: temp_file.path) @crash_uploader.upload_async(temp_file, on_complete: method(:clean_temporary_file)) end end end |
#storage_access_key ⇒ Object
Azure storage account access key (required) - found under the access keys tab
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 |
# File 'lib/logstash/outputs/azure.rb', line 56 class LogStash::Outputs::LogstashAzureBlobOutput < LogStash::Outputs::Base # name for the namespace under output for logstash configuration config_name 'azure' default :codec, "line" require 'logstash/outputs/blob/writable_directory_validator' require 'logstash/outputs/blob/path_validator' require 'logstash/outputs/blob/size_rotation_policy' require 'logstash/outputs/blob/time_rotation_policy' require 'logstash/outputs/blob/size_and_time_rotation_policy' require 'logstash/outputs/blob/temporary_file' require 'logstash/outputs/blob/temporary_file_factory' require 'logstash/outputs/blob/uploader' require 'logstash/outputs/blob/file_repository' PREFIX_KEY_NORMALIZE_CHARACTER = '_'.freeze PERIODIC_CHECK_INTERVAL_IN_SECONDS = 15 CRASH_RECOVERY_THREADPOOL = Concurrent::ThreadPoolExecutor.new(min_threads: 1, max_threads: 2, fallback_policy: :caller_runs) # azure contianer config :storage_account_name, validate: :string, required: false # azure key config :storage_access_key, validate: :string, required: false # conatainer name config :container_name, validate: :string, required: false # mamadas config :size_file, validate: :number, default: 1024 * 1024 * 5 config :time_file, validate: :number, default: 15 config :restore, validate: :boolean, default: true config :temporary_directory, validate: :string, default: File.join(Dir.tmpdir, 'logstash') config :prefix, validate: :string, default: '' config :upload_queue_size, validate: :number, default: 2 * (Concurrent.processor_count * 0.25).ceil config :upload_workers_count, validate: :number, default: (Concurrent.processor_count * 0.5).ceil config :rotation_strategy_val, validate: %w[size_and_time size time], default: 'size_and_time' config :tags, validate: :array, default: [] config :encoding, validate: %w[none gzip], default: 'none' attr_accessor :storage_account_name, :storage_access_key, :container_name, :size_file, :time_file, :restore, :temporary_directory, :prefix, :upload_queue_size, :upload_workers_count, :rotation_strategy_val, :tags, :encoding # initializes the +LogstashAzureBlobOutput+ instances # validates all config parameters # initializes the uploader def register unless @prefix.empty? unless PathValidator.valid?(prefix) raise LogStash::ConfigurationError.new("Prefix must not contains: #{PathValidator::INVALID_CHARACTERS}") end end unless WritableDirectoryValidator.valid?(@temporary_directory) raise LogStash::ConfigurationError.new("Logstash must have the permissions to write to the temporary directory: #{@temporary_directory}") end if @time_file.nil? && @size_file.nil? || @size_file.zero? && @time_file.zero? raise LogStash::ConfigurationError.new('at least one of time_file or size_file set to a value greater than 0') end @file_repository = FileRepository.new(@tags, @encoding, @temporary_directory) @rotation = rotation_strategy executor = Concurrent::ThreadPoolExecutor.new(min_threads: 1, max_threads: @upload_workers_count, max_queue: @upload_queue_size, fallback_policy: :caller_runs) @uploader = Uploader.new(blob_container_resource, container_name, @logger, executor) restore_from_crash if @restore start_periodic_check if @rotation.needs_periodic? end # Receives multiple events and check if there is space in temporary directory # @param events_and_encoded [Object] def multi_receive_encoded(events_and_encoded) prefix_written_to = Set.new events_and_encoded.each do |event, encoded| prefix_key = normalize_key(event.sprintf(@prefix)) prefix_written_to << prefix_key begin @file_repository.get_file(prefix_key) { |file| file.write(encoded) } # The output should stop accepting new events coming in, since it cannot do anything with them anymore. # Log the error and rethrow it. rescue Errno::ENOSPC => e @logger.error('Azure: No space left in temporary directory', temporary_directory: @temporary_directory) raise e end end # Groups IO calls to optimize fstat checks rotate_if_needed(prefix_written_to) end # close the temporary file and uploads the content to Azure def close stop_periodic_check if @rotation.needs_periodic? @logger.debug('Uploading current workspace') # The plugin has stopped receiving new events, but we still have # data on disk, lets make sure it get to Azure blob. # If Logstash get interrupted, the `restore_from_crash` (when set to true) method will pickup # the content in the temporary directly and upload it. # This will block the shutdown until all upload are done or the use force quit. @file_repository.each_files do |file| upload_file(file) end @file_repository.shutdown @uploader.stop # wait until all the current upload are complete @crash_uploader.stop if @restore # we might have still work to do for recovery so wait until we are done end # Validates and normalize prefix key # @param prefix_key [String] def normalize_key(prefix_key) prefix_key.gsub(PathValidator.matches_re, PREFIX_KEY_NORMALIZE_CHARACTER) end # checks periodically the tmeporary file if it needs to be rotated def start_periodic_check @logger.debug('Start periodic rotation check') @periodic_check = Concurrent::TimerTask.new(execution_interval: PERIODIC_CHECK_INTERVAL_IN_SECONDS) do @logger.debug('Periodic check for stale files') rotate_if_needed(@file_repository.keys) end @periodic_check.execute end def stop_periodic_check @periodic_check.shutdown end # login to azure cloud using azure gem and create the contianer if it doesn't exist # @return [Object] the azure_blob_service object, which is the endpoint to azure gem def blob_container_resource Azure.config.storage_account_name = storage_account_name Azure.config.storage_access_key = storage_access_key azure_blob_service = Azure::Blob::BlobService.new list = azure_blob_service.list_containers list.each do |item| @container = item if item.name == container_name end azure_blob_service.create_container(container_name) unless @container azure_blob_service end # check if it needs to rotate according to rotation policy and rotates it if it needs # @param prefixes [String] def rotate_if_needed(prefixes) prefixes.each do |prefix| # Each file access is thread safe, # until the rotation is done then only # one thread has access to the resource. @file_repository.get_factory(prefix) do |factory| temp_file = factory.current if @rotation.rotate?(temp_file) @logger.debug('Rotate file', strategy: @rotation.class.name, key: temp_file.key, path: temp_file.path) upload_file(temp_file) factory.rotate! end end end end # uploads the file using the +Uploader+ def upload_file(temp_file) @logger.debug('Queue for upload', path: temp_file.path) # if the queue is full the calling thread will be used to upload temp_file.close # make sure the content is on disk unless temp_file.empty? # rubocop:disable GuardClause @uploader.upload_async(temp_file, on_complete: method(:clean_temporary_file)) end end # creates an instance for the rotation strategy def rotation_strategy case @rotation_strategy_val when 'size' SizeRotationPolicy.new(size_file) when 'time' TimeRotationPolicy.new(time_file) when 'size_and_time' SizeAndTimeRotationPolicy.new(size_file, time_file) end end # Cleans the temporary files after it is uploaded to azure blob def clean_temporary_file(file) @logger.debug('Removing temporary file', file: file.path) file.delete! end # uploads files if there was a crash before def restore_from_crash @crash_uploader = Uploader.new(blob_container_resource, container_name, @logger, CRASH_RECOVERY_THREADPOOL) temp_folder_path = Pathname.new(@temporary_directory) Dir.glob(::File.join(@temporary_directory, '**/*')) .select { |file| ::File.file?(file) } .each do |file| temp_file = TemporaryFile.create_from_existing_file(file, temp_folder_path) @logger.debug('Recovering from crash and uploading', file: temp_file.path) @crash_uploader.upload_async(temp_file, on_complete: method(:clean_temporary_file)) end end end |
#storage_account_name ⇒ Object
Azure storage account name (required) - found under the access keys tab
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 |
# File 'lib/logstash/outputs/azure.rb', line 56 class LogStash::Outputs::LogstashAzureBlobOutput < LogStash::Outputs::Base # name for the namespace under output for logstash configuration config_name 'azure' default :codec, "line" require 'logstash/outputs/blob/writable_directory_validator' require 'logstash/outputs/blob/path_validator' require 'logstash/outputs/blob/size_rotation_policy' require 'logstash/outputs/blob/time_rotation_policy' require 'logstash/outputs/blob/size_and_time_rotation_policy' require 'logstash/outputs/blob/temporary_file' require 'logstash/outputs/blob/temporary_file_factory' require 'logstash/outputs/blob/uploader' require 'logstash/outputs/blob/file_repository' PREFIX_KEY_NORMALIZE_CHARACTER = '_'.freeze PERIODIC_CHECK_INTERVAL_IN_SECONDS = 15 CRASH_RECOVERY_THREADPOOL = Concurrent::ThreadPoolExecutor.new(min_threads: 1, max_threads: 2, fallback_policy: :caller_runs) # azure contianer config :storage_account_name, validate: :string, required: false # azure key config :storage_access_key, validate: :string, required: false # conatainer name config :container_name, validate: :string, required: false # mamadas config :size_file, validate: :number, default: 1024 * 1024 * 5 config :time_file, validate: :number, default: 15 config :restore, validate: :boolean, default: true config :temporary_directory, validate: :string, default: File.join(Dir.tmpdir, 'logstash') config :prefix, validate: :string, default: '' config :upload_queue_size, validate: :number, default: 2 * (Concurrent.processor_count * 0.25).ceil config :upload_workers_count, validate: :number, default: (Concurrent.processor_count * 0.5).ceil config :rotation_strategy_val, validate: %w[size_and_time size time], default: 'size_and_time' config :tags, validate: :array, default: [] config :encoding, validate: %w[none gzip], default: 'none' attr_accessor :storage_account_name, :storage_access_key, :container_name, :size_file, :time_file, :restore, :temporary_directory, :prefix, :upload_queue_size, :upload_workers_count, :rotation_strategy_val, :tags, :encoding # initializes the +LogstashAzureBlobOutput+ instances # validates all config parameters # initializes the uploader def register unless @prefix.empty? unless PathValidator.valid?(prefix) raise LogStash::ConfigurationError.new("Prefix must not contains: #{PathValidator::INVALID_CHARACTERS}") end end unless WritableDirectoryValidator.valid?(@temporary_directory) raise LogStash::ConfigurationError.new("Logstash must have the permissions to write to the temporary directory: #{@temporary_directory}") end if @time_file.nil? && @size_file.nil? || @size_file.zero? && @time_file.zero? raise LogStash::ConfigurationError.new('at least one of time_file or size_file set to a value greater than 0') end @file_repository = FileRepository.new(@tags, @encoding, @temporary_directory) @rotation = rotation_strategy executor = Concurrent::ThreadPoolExecutor.new(min_threads: 1, max_threads: @upload_workers_count, max_queue: @upload_queue_size, fallback_policy: :caller_runs) @uploader = Uploader.new(blob_container_resource, container_name, @logger, executor) restore_from_crash if @restore start_periodic_check if @rotation.needs_periodic? end # Receives multiple events and check if there is space in temporary directory # @param events_and_encoded [Object] def multi_receive_encoded(events_and_encoded) prefix_written_to = Set.new events_and_encoded.each do |event, encoded| prefix_key = normalize_key(event.sprintf(@prefix)) prefix_written_to << prefix_key begin @file_repository.get_file(prefix_key) { |file| file.write(encoded) } # The output should stop accepting new events coming in, since it cannot do anything with them anymore. # Log the error and rethrow it. rescue Errno::ENOSPC => e @logger.error('Azure: No space left in temporary directory', temporary_directory: @temporary_directory) raise e end end # Groups IO calls to optimize fstat checks rotate_if_needed(prefix_written_to) end # close the temporary file and uploads the content to Azure def close stop_periodic_check if @rotation.needs_periodic? @logger.debug('Uploading current workspace') # The plugin has stopped receiving new events, but we still have # data on disk, lets make sure it get to Azure blob. # If Logstash get interrupted, the `restore_from_crash` (when set to true) method will pickup # the content in the temporary directly and upload it. # This will block the shutdown until all upload are done or the use force quit. @file_repository.each_files do |file| upload_file(file) end @file_repository.shutdown @uploader.stop # wait until all the current upload are complete @crash_uploader.stop if @restore # we might have still work to do for recovery so wait until we are done end # Validates and normalize prefix key # @param prefix_key [String] def normalize_key(prefix_key) prefix_key.gsub(PathValidator.matches_re, PREFIX_KEY_NORMALIZE_CHARACTER) end # checks periodically the tmeporary file if it needs to be rotated def start_periodic_check @logger.debug('Start periodic rotation check') @periodic_check = Concurrent::TimerTask.new(execution_interval: PERIODIC_CHECK_INTERVAL_IN_SECONDS) do @logger.debug('Periodic check for stale files') rotate_if_needed(@file_repository.keys) end @periodic_check.execute end def stop_periodic_check @periodic_check.shutdown end # login to azure cloud using azure gem and create the contianer if it doesn't exist # @return [Object] the azure_blob_service object, which is the endpoint to azure gem def blob_container_resource Azure.config.storage_account_name = storage_account_name Azure.config.storage_access_key = storage_access_key azure_blob_service = Azure::Blob::BlobService.new list = azure_blob_service.list_containers list.each do |item| @container = item if item.name == container_name end azure_blob_service.create_container(container_name) unless @container azure_blob_service end # check if it needs to rotate according to rotation policy and rotates it if it needs # @param prefixes [String] def rotate_if_needed(prefixes) prefixes.each do |prefix| # Each file access is thread safe, # until the rotation is done then only # one thread has access to the resource. @file_repository.get_factory(prefix) do |factory| temp_file = factory.current if @rotation.rotate?(temp_file) @logger.debug('Rotate file', strategy: @rotation.class.name, key: temp_file.key, path: temp_file.path) upload_file(temp_file) factory.rotate! end end end end # uploads the file using the +Uploader+ def upload_file(temp_file) @logger.debug('Queue for upload', path: temp_file.path) # if the queue is full the calling thread will be used to upload temp_file.close # make sure the content is on disk unless temp_file.empty? # rubocop:disable GuardClause @uploader.upload_async(temp_file, on_complete: method(:clean_temporary_file)) end end # creates an instance for the rotation strategy def rotation_strategy case @rotation_strategy_val when 'size' SizeRotationPolicy.new(size_file) when 'time' TimeRotationPolicy.new(time_file) when 'size_and_time' SizeAndTimeRotationPolicy.new(size_file, time_file) end end # Cleans the temporary files after it is uploaded to azure blob def clean_temporary_file(file) @logger.debug('Removing temporary file', file: file.path) file.delete! end # uploads files if there was a crash before def restore_from_crash @crash_uploader = Uploader.new(blob_container_resource, container_name, @logger, CRASH_RECOVERY_THREADPOOL) temp_folder_path = Pathname.new(@temporary_directory) Dir.glob(::File.join(@temporary_directory, '**/*')) .select { |file| ::File.file?(file) } .each do |file| temp_file = TemporaryFile.create_from_existing_file(file, temp_folder_path) @logger.debug('Recovering from crash and uploading', file: temp_file.path) @crash_uploader.upload_async(temp_file, on_complete: method(:clean_temporary_file)) end end end |
#tags ⇒ Object
tags for the files
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 |
# File 'lib/logstash/outputs/azure.rb', line 56 class LogStash::Outputs::LogstashAzureBlobOutput < LogStash::Outputs::Base # name for the namespace under output for logstash configuration config_name 'azure' default :codec, "line" require 'logstash/outputs/blob/writable_directory_validator' require 'logstash/outputs/blob/path_validator' require 'logstash/outputs/blob/size_rotation_policy' require 'logstash/outputs/blob/time_rotation_policy' require 'logstash/outputs/blob/size_and_time_rotation_policy' require 'logstash/outputs/blob/temporary_file' require 'logstash/outputs/blob/temporary_file_factory' require 'logstash/outputs/blob/uploader' require 'logstash/outputs/blob/file_repository' PREFIX_KEY_NORMALIZE_CHARACTER = '_'.freeze PERIODIC_CHECK_INTERVAL_IN_SECONDS = 15 CRASH_RECOVERY_THREADPOOL = Concurrent::ThreadPoolExecutor.new(min_threads: 1, max_threads: 2, fallback_policy: :caller_runs) # azure contianer config :storage_account_name, validate: :string, required: false # azure key config :storage_access_key, validate: :string, required: false # conatainer name config :container_name, validate: :string, required: false # mamadas config :size_file, validate: :number, default: 1024 * 1024 * 5 config :time_file, validate: :number, default: 15 config :restore, validate: :boolean, default: true config :temporary_directory, validate: :string, default: File.join(Dir.tmpdir, 'logstash') config :prefix, validate: :string, default: '' config :upload_queue_size, validate: :number, default: 2 * (Concurrent.processor_count * 0.25).ceil config :upload_workers_count, validate: :number, default: (Concurrent.processor_count * 0.5).ceil config :rotation_strategy_val, validate: %w[size_and_time size time], default: 'size_and_time' config :tags, validate: :array, default: [] config :encoding, validate: %w[none gzip], default: 'none' attr_accessor :storage_account_name, :storage_access_key, :container_name, :size_file, :time_file, :restore, :temporary_directory, :prefix, :upload_queue_size, :upload_workers_count, :rotation_strategy_val, :tags, :encoding # initializes the +LogstashAzureBlobOutput+ instances # validates all config parameters # initializes the uploader def register unless @prefix.empty? unless PathValidator.valid?(prefix) raise LogStash::ConfigurationError.new("Prefix must not contains: #{PathValidator::INVALID_CHARACTERS}") end end unless WritableDirectoryValidator.valid?(@temporary_directory) raise LogStash::ConfigurationError.new("Logstash must have the permissions to write to the temporary directory: #{@temporary_directory}") end if @time_file.nil? && @size_file.nil? || @size_file.zero? && @time_file.zero? raise LogStash::ConfigurationError.new('at least one of time_file or size_file set to a value greater than 0') end @file_repository = FileRepository.new(@tags, @encoding, @temporary_directory) @rotation = rotation_strategy executor = Concurrent::ThreadPoolExecutor.new(min_threads: 1, max_threads: @upload_workers_count, max_queue: @upload_queue_size, fallback_policy: :caller_runs) @uploader = Uploader.new(blob_container_resource, container_name, @logger, executor) restore_from_crash if @restore start_periodic_check if @rotation.needs_periodic? end # Receives multiple events and check if there is space in temporary directory # @param events_and_encoded [Object] def multi_receive_encoded(events_and_encoded) prefix_written_to = Set.new events_and_encoded.each do |event, encoded| prefix_key = normalize_key(event.sprintf(@prefix)) prefix_written_to << prefix_key begin @file_repository.get_file(prefix_key) { |file| file.write(encoded) } # The output should stop accepting new events coming in, since it cannot do anything with them anymore. # Log the error and rethrow it. rescue Errno::ENOSPC => e @logger.error('Azure: No space left in temporary directory', temporary_directory: @temporary_directory) raise e end end # Groups IO calls to optimize fstat checks rotate_if_needed(prefix_written_to) end # close the temporary file and uploads the content to Azure def close stop_periodic_check if @rotation.needs_periodic? @logger.debug('Uploading current workspace') # The plugin has stopped receiving new events, but we still have # data on disk, lets make sure it get to Azure blob. # If Logstash get interrupted, the `restore_from_crash` (when set to true) method will pickup # the content in the temporary directly and upload it. # This will block the shutdown until all upload are done or the use force quit. @file_repository.each_files do |file| upload_file(file) end @file_repository.shutdown @uploader.stop # wait until all the current upload are complete @crash_uploader.stop if @restore # we might have still work to do for recovery so wait until we are done end # Validates and normalize prefix key # @param prefix_key [String] def normalize_key(prefix_key) prefix_key.gsub(PathValidator.matches_re, PREFIX_KEY_NORMALIZE_CHARACTER) end # checks periodically the tmeporary file if it needs to be rotated def start_periodic_check @logger.debug('Start periodic rotation check') @periodic_check = Concurrent::TimerTask.new(execution_interval: PERIODIC_CHECK_INTERVAL_IN_SECONDS) do @logger.debug('Periodic check for stale files') rotate_if_needed(@file_repository.keys) end @periodic_check.execute end def stop_periodic_check @periodic_check.shutdown end # login to azure cloud using azure gem and create the contianer if it doesn't exist # @return [Object] the azure_blob_service object, which is the endpoint to azure gem def blob_container_resource Azure.config.storage_account_name = storage_account_name Azure.config.storage_access_key = storage_access_key azure_blob_service = Azure::Blob::BlobService.new list = azure_blob_service.list_containers list.each do |item| @container = item if item.name == container_name end azure_blob_service.create_container(container_name) unless @container azure_blob_service end # check if it needs to rotate according to rotation policy and rotates it if it needs # @param prefixes [String] def rotate_if_needed(prefixes) prefixes.each do |prefix| # Each file access is thread safe, # until the rotation is done then only # one thread has access to the resource. @file_repository.get_factory(prefix) do |factory| temp_file = factory.current if @rotation.rotate?(temp_file) @logger.debug('Rotate file', strategy: @rotation.class.name, key: temp_file.key, path: temp_file.path) upload_file(temp_file) factory.rotate! end end end end # uploads the file using the +Uploader+ def upload_file(temp_file) @logger.debug('Queue for upload', path: temp_file.path) # if the queue is full the calling thread will be used to upload temp_file.close # make sure the content is on disk unless temp_file.empty? # rubocop:disable GuardClause @uploader.upload_async(temp_file, on_complete: method(:clean_temporary_file)) end end # creates an instance for the rotation strategy def rotation_strategy case @rotation_strategy_val when 'size' SizeRotationPolicy.new(size_file) when 'time' TimeRotationPolicy.new(time_file) when 'size_and_time' SizeAndTimeRotationPolicy.new(size_file, time_file) end end # Cleans the temporary files after it is uploaded to azure blob def clean_temporary_file(file) @logger.debug('Removing temporary file', file: file.path) file.delete! end # uploads files if there was a crash before def restore_from_crash @crash_uploader = Uploader.new(blob_container_resource, container_name, @logger, CRASH_RECOVERY_THREADPOOL) temp_folder_path = Pathname.new(@temporary_directory) Dir.glob(::File.join(@temporary_directory, '**/*')) .select { |file| ::File.file?(file) } .each do |file| temp_file = TemporaryFile.create_from_existing_file(file, temp_folder_path) @logger.debug('Recovering from crash and uploading', file: temp_file.path) @crash_uploader.upload_async(temp_file, on_complete: method(:clean_temporary_file)) end end end |
#temporary_directory ⇒ Object
temporary directory where the temporary files will be written
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 |
# File 'lib/logstash/outputs/azure.rb', line 56 class LogStash::Outputs::LogstashAzureBlobOutput < LogStash::Outputs::Base # name for the namespace under output for logstash configuration config_name 'azure' default :codec, "line" require 'logstash/outputs/blob/writable_directory_validator' require 'logstash/outputs/blob/path_validator' require 'logstash/outputs/blob/size_rotation_policy' require 'logstash/outputs/blob/time_rotation_policy' require 'logstash/outputs/blob/size_and_time_rotation_policy' require 'logstash/outputs/blob/temporary_file' require 'logstash/outputs/blob/temporary_file_factory' require 'logstash/outputs/blob/uploader' require 'logstash/outputs/blob/file_repository' PREFIX_KEY_NORMALIZE_CHARACTER = '_'.freeze PERIODIC_CHECK_INTERVAL_IN_SECONDS = 15 CRASH_RECOVERY_THREADPOOL = Concurrent::ThreadPoolExecutor.new(min_threads: 1, max_threads: 2, fallback_policy: :caller_runs) # azure contianer config :storage_account_name, validate: :string, required: false # azure key config :storage_access_key, validate: :string, required: false # conatainer name config :container_name, validate: :string, required: false # mamadas config :size_file, validate: :number, default: 1024 * 1024 * 5 config :time_file, validate: :number, default: 15 config :restore, validate: :boolean, default: true config :temporary_directory, validate: :string, default: File.join(Dir.tmpdir, 'logstash') config :prefix, validate: :string, default: '' config :upload_queue_size, validate: :number, default: 2 * (Concurrent.processor_count * 0.25).ceil config :upload_workers_count, validate: :number, default: (Concurrent.processor_count * 0.5).ceil config :rotation_strategy_val, validate: %w[size_and_time size time], default: 'size_and_time' config :tags, validate: :array, default: [] config :encoding, validate: %w[none gzip], default: 'none' attr_accessor :storage_account_name, :storage_access_key, :container_name, :size_file, :time_file, :restore, :temporary_directory, :prefix, :upload_queue_size, :upload_workers_count, :rotation_strategy_val, :tags, :encoding # initializes the +LogstashAzureBlobOutput+ instances # validates all config parameters # initializes the uploader def register unless @prefix.empty? unless PathValidator.valid?(prefix) raise LogStash::ConfigurationError.new("Prefix must not contains: #{PathValidator::INVALID_CHARACTERS}") end end unless WritableDirectoryValidator.valid?(@temporary_directory) raise LogStash::ConfigurationError.new("Logstash must have the permissions to write to the temporary directory: #{@temporary_directory}") end if @time_file.nil? && @size_file.nil? || @size_file.zero? && @time_file.zero? raise LogStash::ConfigurationError.new('at least one of time_file or size_file set to a value greater than 0') end @file_repository = FileRepository.new(@tags, @encoding, @temporary_directory) @rotation = rotation_strategy executor = Concurrent::ThreadPoolExecutor.new(min_threads: 1, max_threads: @upload_workers_count, max_queue: @upload_queue_size, fallback_policy: :caller_runs) @uploader = Uploader.new(blob_container_resource, container_name, @logger, executor) restore_from_crash if @restore start_periodic_check if @rotation.needs_periodic? end # Receives multiple events and check if there is space in temporary directory # @param events_and_encoded [Object] def multi_receive_encoded(events_and_encoded) prefix_written_to = Set.new events_and_encoded.each do |event, encoded| prefix_key = normalize_key(event.sprintf(@prefix)) prefix_written_to << prefix_key begin @file_repository.get_file(prefix_key) { |file| file.write(encoded) } # The output should stop accepting new events coming in, since it cannot do anything with them anymore. # Log the error and rethrow it. rescue Errno::ENOSPC => e @logger.error('Azure: No space left in temporary directory', temporary_directory: @temporary_directory) raise e end end # Groups IO calls to optimize fstat checks rotate_if_needed(prefix_written_to) end # close the temporary file and uploads the content to Azure def close stop_periodic_check if @rotation.needs_periodic? @logger.debug('Uploading current workspace') # The plugin has stopped receiving new events, but we still have # data on disk, lets make sure it get to Azure blob. # If Logstash get interrupted, the `restore_from_crash` (when set to true) method will pickup # the content in the temporary directly and upload it. # This will block the shutdown until all upload are done or the use force quit. @file_repository.each_files do |file| upload_file(file) end @file_repository.shutdown @uploader.stop # wait until all the current upload are complete @crash_uploader.stop if @restore # we might have still work to do for recovery so wait until we are done end # Validates and normalize prefix key # @param prefix_key [String] def normalize_key(prefix_key) prefix_key.gsub(PathValidator.matches_re, PREFIX_KEY_NORMALIZE_CHARACTER) end # checks periodically the tmeporary file if it needs to be rotated def start_periodic_check @logger.debug('Start periodic rotation check') @periodic_check = Concurrent::TimerTask.new(execution_interval: PERIODIC_CHECK_INTERVAL_IN_SECONDS) do @logger.debug('Periodic check for stale files') rotate_if_needed(@file_repository.keys) end @periodic_check.execute end def stop_periodic_check @periodic_check.shutdown end # login to azure cloud using azure gem and create the contianer if it doesn't exist # @return [Object] the azure_blob_service object, which is the endpoint to azure gem def blob_container_resource Azure.config.storage_account_name = storage_account_name Azure.config.storage_access_key = storage_access_key azure_blob_service = Azure::Blob::BlobService.new list = azure_blob_service.list_containers list.each do |item| @container = item if item.name == container_name end azure_blob_service.create_container(container_name) unless @container azure_blob_service end # check if it needs to rotate according to rotation policy and rotates it if it needs # @param prefixes [String] def rotate_if_needed(prefixes) prefixes.each do |prefix| # Each file access is thread safe, # until the rotation is done then only # one thread has access to the resource. @file_repository.get_factory(prefix) do |factory| temp_file = factory.current if @rotation.rotate?(temp_file) @logger.debug('Rotate file', strategy: @rotation.class.name, key: temp_file.key, path: temp_file.path) upload_file(temp_file) factory.rotate! end end end end # uploads the file using the +Uploader+ def upload_file(temp_file) @logger.debug('Queue for upload', path: temp_file.path) # if the queue is full the calling thread will be used to upload temp_file.close # make sure the content is on disk unless temp_file.empty? # rubocop:disable GuardClause @uploader.upload_async(temp_file, on_complete: method(:clean_temporary_file)) end end # creates an instance for the rotation strategy def rotation_strategy case @rotation_strategy_val when 'size' SizeRotationPolicy.new(size_file) when 'time' TimeRotationPolicy.new(time_file) when 'size_and_time' SizeAndTimeRotationPolicy.new(size_file, time_file) end end # Cleans the temporary files after it is uploaded to azure blob def clean_temporary_file(file) @logger.debug('Removing temporary file', file: file.path) file.delete! end # uploads files if there was a crash before def restore_from_crash @crash_uploader = Uploader.new(blob_container_resource, container_name, @logger, CRASH_RECOVERY_THREADPOOL) temp_folder_path = Pathname.new(@temporary_directory) Dir.glob(::File.join(@temporary_directory, '**/*')) .select { |file| ::File.file?(file) } .each do |file| temp_file = TemporaryFile.create_from_existing_file(file, temp_folder_path) @logger.debug('Recovering from crash and uploading', file: temp_file.path) @crash_uploader.upload_async(temp_file, on_complete: method(:clean_temporary_file)) end end end |
#time_file ⇒ Object
time to upload the local File
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 |
# File 'lib/logstash/outputs/azure.rb', line 56 class LogStash::Outputs::LogstashAzureBlobOutput < LogStash::Outputs::Base # name for the namespace under output for logstash configuration config_name 'azure' default :codec, "line" require 'logstash/outputs/blob/writable_directory_validator' require 'logstash/outputs/blob/path_validator' require 'logstash/outputs/blob/size_rotation_policy' require 'logstash/outputs/blob/time_rotation_policy' require 'logstash/outputs/blob/size_and_time_rotation_policy' require 'logstash/outputs/blob/temporary_file' require 'logstash/outputs/blob/temporary_file_factory' require 'logstash/outputs/blob/uploader' require 'logstash/outputs/blob/file_repository' PREFIX_KEY_NORMALIZE_CHARACTER = '_'.freeze PERIODIC_CHECK_INTERVAL_IN_SECONDS = 15 CRASH_RECOVERY_THREADPOOL = Concurrent::ThreadPoolExecutor.new(min_threads: 1, max_threads: 2, fallback_policy: :caller_runs) # azure contianer config :storage_account_name, validate: :string, required: false # azure key config :storage_access_key, validate: :string, required: false # conatainer name config :container_name, validate: :string, required: false # mamadas config :size_file, validate: :number, default: 1024 * 1024 * 5 config :time_file, validate: :number, default: 15 config :restore, validate: :boolean, default: true config :temporary_directory, validate: :string, default: File.join(Dir.tmpdir, 'logstash') config :prefix, validate: :string, default: '' config :upload_queue_size, validate: :number, default: 2 * (Concurrent.processor_count * 0.25).ceil config :upload_workers_count, validate: :number, default: (Concurrent.processor_count * 0.5).ceil config :rotation_strategy_val, validate: %w[size_and_time size time], default: 'size_and_time' config :tags, validate: :array, default: [] config :encoding, validate: %w[none gzip], default: 'none' attr_accessor :storage_account_name, :storage_access_key, :container_name, :size_file, :time_file, :restore, :temporary_directory, :prefix, :upload_queue_size, :upload_workers_count, :rotation_strategy_val, :tags, :encoding # initializes the +LogstashAzureBlobOutput+ instances # validates all config parameters # initializes the uploader def register unless @prefix.empty? unless PathValidator.valid?(prefix) raise LogStash::ConfigurationError.new("Prefix must not contains: #{PathValidator::INVALID_CHARACTERS}") end end unless WritableDirectoryValidator.valid?(@temporary_directory) raise LogStash::ConfigurationError.new("Logstash must have the permissions to write to the temporary directory: #{@temporary_directory}") end if @time_file.nil? && @size_file.nil? || @size_file.zero? && @time_file.zero? raise LogStash::ConfigurationError.new('at least one of time_file or size_file set to a value greater than 0') end @file_repository = FileRepository.new(@tags, @encoding, @temporary_directory) @rotation = rotation_strategy executor = Concurrent::ThreadPoolExecutor.new(min_threads: 1, max_threads: @upload_workers_count, max_queue: @upload_queue_size, fallback_policy: :caller_runs) @uploader = Uploader.new(blob_container_resource, container_name, @logger, executor) restore_from_crash if @restore start_periodic_check if @rotation.needs_periodic? end # Receives multiple events and check if there is space in temporary directory # @param events_and_encoded [Object] def multi_receive_encoded(events_and_encoded) prefix_written_to = Set.new events_and_encoded.each do |event, encoded| prefix_key = normalize_key(event.sprintf(@prefix)) prefix_written_to << prefix_key begin @file_repository.get_file(prefix_key) { |file| file.write(encoded) } # The output should stop accepting new events coming in, since it cannot do anything with them anymore. # Log the error and rethrow it. rescue Errno::ENOSPC => e @logger.error('Azure: No space left in temporary directory', temporary_directory: @temporary_directory) raise e end end # Groups IO calls to optimize fstat checks rotate_if_needed(prefix_written_to) end # close the temporary file and uploads the content to Azure def close stop_periodic_check if @rotation.needs_periodic? @logger.debug('Uploading current workspace') # The plugin has stopped receiving new events, but we still have # data on disk, lets make sure it get to Azure blob. # If Logstash get interrupted, the `restore_from_crash` (when set to true) method will pickup # the content in the temporary directly and upload it. # This will block the shutdown until all upload are done or the use force quit. @file_repository.each_files do |file| upload_file(file) end @file_repository.shutdown @uploader.stop # wait until all the current upload are complete @crash_uploader.stop if @restore # we might have still work to do for recovery so wait until we are done end # Validates and normalize prefix key # @param prefix_key [String] def normalize_key(prefix_key) prefix_key.gsub(PathValidator.matches_re, PREFIX_KEY_NORMALIZE_CHARACTER) end # checks periodically the tmeporary file if it needs to be rotated def start_periodic_check @logger.debug('Start periodic rotation check') @periodic_check = Concurrent::TimerTask.new(execution_interval: PERIODIC_CHECK_INTERVAL_IN_SECONDS) do @logger.debug('Periodic check for stale files') rotate_if_needed(@file_repository.keys) end @periodic_check.execute end def stop_periodic_check @periodic_check.shutdown end # login to azure cloud using azure gem and create the contianer if it doesn't exist # @return [Object] the azure_blob_service object, which is the endpoint to azure gem def blob_container_resource Azure.config.storage_account_name = storage_account_name Azure.config.storage_access_key = storage_access_key azure_blob_service = Azure::Blob::BlobService.new list = azure_blob_service.list_containers list.each do |item| @container = item if item.name == container_name end azure_blob_service.create_container(container_name) unless @container azure_blob_service end # check if it needs to rotate according to rotation policy and rotates it if it needs # @param prefixes [String] def rotate_if_needed(prefixes) prefixes.each do |prefix| # Each file access is thread safe, # until the rotation is done then only # one thread has access to the resource. @file_repository.get_factory(prefix) do |factory| temp_file = factory.current if @rotation.rotate?(temp_file) @logger.debug('Rotate file', strategy: @rotation.class.name, key: temp_file.key, path: temp_file.path) upload_file(temp_file) factory.rotate! end end end end # uploads the file using the +Uploader+ def upload_file(temp_file) @logger.debug('Queue for upload', path: temp_file.path) # if the queue is full the calling thread will be used to upload temp_file.close # make sure the content is on disk unless temp_file.empty? # rubocop:disable GuardClause @uploader.upload_async(temp_file, on_complete: method(:clean_temporary_file)) end end # creates an instance for the rotation strategy def rotation_strategy case @rotation_strategy_val when 'size' SizeRotationPolicy.new(size_file) when 'time' TimeRotationPolicy.new(time_file) when 'size_and_time' SizeAndTimeRotationPolicy.new(size_file, time_file) end end # Cleans the temporary files after it is uploaded to azure blob def clean_temporary_file(file) @logger.debug('Removing temporary file', file: file.path) file.delete! end # uploads files if there was a crash before def restore_from_crash @crash_uploader = Uploader.new(blob_container_resource, container_name, @logger, CRASH_RECOVERY_THREADPOOL) temp_folder_path = Pathname.new(@temporary_directory) Dir.glob(::File.join(@temporary_directory, '**/*')) .select { |file| ::File.file?(file) } .each do |file| temp_file = TemporaryFile.create_from_existing_file(file, temp_folder_path) @logger.debug('Recovering from crash and uploading', file: temp_file.path) @crash_uploader.upload_async(temp_file, on_complete: method(:clean_temporary_file)) end end end |
#upload queue size(queuesize) ⇒ Object
upload que size
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 |
# File 'lib/logstash/outputs/azure.rb', line 56 class LogStash::Outputs::LogstashAzureBlobOutput < LogStash::Outputs::Base # name for the namespace under output for logstash configuration config_name 'azure' default :codec, "line" require 'logstash/outputs/blob/writable_directory_validator' require 'logstash/outputs/blob/path_validator' require 'logstash/outputs/blob/size_rotation_policy' require 'logstash/outputs/blob/time_rotation_policy' require 'logstash/outputs/blob/size_and_time_rotation_policy' require 'logstash/outputs/blob/temporary_file' require 'logstash/outputs/blob/temporary_file_factory' require 'logstash/outputs/blob/uploader' require 'logstash/outputs/blob/file_repository' PREFIX_KEY_NORMALIZE_CHARACTER = '_'.freeze PERIODIC_CHECK_INTERVAL_IN_SECONDS = 15 CRASH_RECOVERY_THREADPOOL = Concurrent::ThreadPoolExecutor.new(min_threads: 1, max_threads: 2, fallback_policy: :caller_runs) # azure contianer config :storage_account_name, validate: :string, required: false # azure key config :storage_access_key, validate: :string, required: false # conatainer name config :container_name, validate: :string, required: false # mamadas config :size_file, validate: :number, default: 1024 * 1024 * 5 config :time_file, validate: :number, default: 15 config :restore, validate: :boolean, default: true config :temporary_directory, validate: :string, default: File.join(Dir.tmpdir, 'logstash') config :prefix, validate: :string, default: '' config :upload_queue_size, validate: :number, default: 2 * (Concurrent.processor_count * 0.25).ceil config :upload_workers_count, validate: :number, default: (Concurrent.processor_count * 0.5).ceil config :rotation_strategy_val, validate: %w[size_and_time size time], default: 'size_and_time' config :tags, validate: :array, default: [] config :encoding, validate: %w[none gzip], default: 'none' attr_accessor :storage_account_name, :storage_access_key, :container_name, :size_file, :time_file, :restore, :temporary_directory, :prefix, :upload_queue_size, :upload_workers_count, :rotation_strategy_val, :tags, :encoding # initializes the +LogstashAzureBlobOutput+ instances # validates all config parameters # initializes the uploader def register unless @prefix.empty? unless PathValidator.valid?(prefix) raise LogStash::ConfigurationError.new("Prefix must not contains: #{PathValidator::INVALID_CHARACTERS}") end end unless WritableDirectoryValidator.valid?(@temporary_directory) raise LogStash::ConfigurationError.new("Logstash must have the permissions to write to the temporary directory: #{@temporary_directory}") end if @time_file.nil? && @size_file.nil? || @size_file.zero? && @time_file.zero? raise LogStash::ConfigurationError.new('at least one of time_file or size_file set to a value greater than 0') end @file_repository = FileRepository.new(@tags, @encoding, @temporary_directory) @rotation = rotation_strategy executor = Concurrent::ThreadPoolExecutor.new(min_threads: 1, max_threads: @upload_workers_count, max_queue: @upload_queue_size, fallback_policy: :caller_runs) @uploader = Uploader.new(blob_container_resource, container_name, @logger, executor) restore_from_crash if @restore start_periodic_check if @rotation.needs_periodic? end # Receives multiple events and check if there is space in temporary directory # @param events_and_encoded [Object] def multi_receive_encoded(events_and_encoded) prefix_written_to = Set.new events_and_encoded.each do |event, encoded| prefix_key = normalize_key(event.sprintf(@prefix)) prefix_written_to << prefix_key begin @file_repository.get_file(prefix_key) { |file| file.write(encoded) } # The output should stop accepting new events coming in, since it cannot do anything with them anymore. # Log the error and rethrow it. rescue Errno::ENOSPC => e @logger.error('Azure: No space left in temporary directory', temporary_directory: @temporary_directory) raise e end end # Groups IO calls to optimize fstat checks rotate_if_needed(prefix_written_to) end # close the temporary file and uploads the content to Azure def close stop_periodic_check if @rotation.needs_periodic? @logger.debug('Uploading current workspace') # The plugin has stopped receiving new events, but we still have # data on disk, lets make sure it get to Azure blob. # If Logstash get interrupted, the `restore_from_crash` (when set to true) method will pickup # the content in the temporary directly and upload it. # This will block the shutdown until all upload are done or the use force quit. @file_repository.each_files do |file| upload_file(file) end @file_repository.shutdown @uploader.stop # wait until all the current upload are complete @crash_uploader.stop if @restore # we might have still work to do for recovery so wait until we are done end # Validates and normalize prefix key # @param prefix_key [String] def normalize_key(prefix_key) prefix_key.gsub(PathValidator.matches_re, PREFIX_KEY_NORMALIZE_CHARACTER) end # checks periodically the tmeporary file if it needs to be rotated def start_periodic_check @logger.debug('Start periodic rotation check') @periodic_check = Concurrent::TimerTask.new(execution_interval: PERIODIC_CHECK_INTERVAL_IN_SECONDS) do @logger.debug('Periodic check for stale files') rotate_if_needed(@file_repository.keys) end @periodic_check.execute end def stop_periodic_check @periodic_check.shutdown end # login to azure cloud using azure gem and create the contianer if it doesn't exist # @return [Object] the azure_blob_service object, which is the endpoint to azure gem def blob_container_resource Azure.config.storage_account_name = storage_account_name Azure.config.storage_access_key = storage_access_key azure_blob_service = Azure::Blob::BlobService.new list = azure_blob_service.list_containers list.each do |item| @container = item if item.name == container_name end azure_blob_service.create_container(container_name) unless @container azure_blob_service end # check if it needs to rotate according to rotation policy and rotates it if it needs # @param prefixes [String] def rotate_if_needed(prefixes) prefixes.each do |prefix| # Each file access is thread safe, # until the rotation is done then only # one thread has access to the resource. @file_repository.get_factory(prefix) do |factory| temp_file = factory.current if @rotation.rotate?(temp_file) @logger.debug('Rotate file', strategy: @rotation.class.name, key: temp_file.key, path: temp_file.path) upload_file(temp_file) factory.rotate! end end end end # uploads the file using the +Uploader+ def upload_file(temp_file) @logger.debug('Queue for upload', path: temp_file.path) # if the queue is full the calling thread will be used to upload temp_file.close # make sure the content is on disk unless temp_file.empty? # rubocop:disable GuardClause @uploader.upload_async(temp_file, on_complete: method(:clean_temporary_file)) end end # creates an instance for the rotation strategy def rotation_strategy case @rotation_strategy_val when 'size' SizeRotationPolicy.new(size_file) when 'time' TimeRotationPolicy.new(time_file) when 'size_and_time' SizeAndTimeRotationPolicy.new(size_file, time_file) end end # Cleans the temporary files after it is uploaded to azure blob def clean_temporary_file(file) @logger.debug('Removing temporary file', file: file.path) file.delete! end # uploads files if there was a crash before def restore_from_crash @crash_uploader = Uploader.new(blob_container_resource, container_name, @logger, CRASH_RECOVERY_THREADPOOL) temp_folder_path = Pathname.new(@temporary_directory) Dir.glob(::File.join(@temporary_directory, '**/*')) .select { |file| ::File.file?(file) } .each do |file| temp_file = TemporaryFile.create_from_existing_file(file, temp_folder_path) @logger.debug('Recovering from crash and uploading', file: temp_file.path) @crash_uploader.upload_async(temp_file, on_complete: method(:clean_temporary_file)) end end end |
#upload workers count(workerscount) ⇒ Object
how much workers for uplaod
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 |
# File 'lib/logstash/outputs/azure.rb', line 56 class LogStash::Outputs::LogstashAzureBlobOutput < LogStash::Outputs::Base # name for the namespace under output for logstash configuration config_name 'azure' default :codec, "line" require 'logstash/outputs/blob/writable_directory_validator' require 'logstash/outputs/blob/path_validator' require 'logstash/outputs/blob/size_rotation_policy' require 'logstash/outputs/blob/time_rotation_policy' require 'logstash/outputs/blob/size_and_time_rotation_policy' require 'logstash/outputs/blob/temporary_file' require 'logstash/outputs/blob/temporary_file_factory' require 'logstash/outputs/blob/uploader' require 'logstash/outputs/blob/file_repository' PREFIX_KEY_NORMALIZE_CHARACTER = '_'.freeze PERIODIC_CHECK_INTERVAL_IN_SECONDS = 15 CRASH_RECOVERY_THREADPOOL = Concurrent::ThreadPoolExecutor.new(min_threads: 1, max_threads: 2, fallback_policy: :caller_runs) # azure contianer config :storage_account_name, validate: :string, required: false # azure key config :storage_access_key, validate: :string, required: false # conatainer name config :container_name, validate: :string, required: false # mamadas config :size_file, validate: :number, default: 1024 * 1024 * 5 config :time_file, validate: :number, default: 15 config :restore, validate: :boolean, default: true config :temporary_directory, validate: :string, default: File.join(Dir.tmpdir, 'logstash') config :prefix, validate: :string, default: '' config :upload_queue_size, validate: :number, default: 2 * (Concurrent.processor_count * 0.25).ceil config :upload_workers_count, validate: :number, default: (Concurrent.processor_count * 0.5).ceil config :rotation_strategy_val, validate: %w[size_and_time size time], default: 'size_and_time' config :tags, validate: :array, default: [] config :encoding, validate: %w[none gzip], default: 'none' attr_accessor :storage_account_name, :storage_access_key, :container_name, :size_file, :time_file, :restore, :temporary_directory, :prefix, :upload_queue_size, :upload_workers_count, :rotation_strategy_val, :tags, :encoding # initializes the +LogstashAzureBlobOutput+ instances # validates all config parameters # initializes the uploader def register unless @prefix.empty? unless PathValidator.valid?(prefix) raise LogStash::ConfigurationError.new("Prefix must not contains: #{PathValidator::INVALID_CHARACTERS}") end end unless WritableDirectoryValidator.valid?(@temporary_directory) raise LogStash::ConfigurationError.new("Logstash must have the permissions to write to the temporary directory: #{@temporary_directory}") end if @time_file.nil? && @size_file.nil? || @size_file.zero? && @time_file.zero? raise LogStash::ConfigurationError.new('at least one of time_file or size_file set to a value greater than 0') end @file_repository = FileRepository.new(@tags, @encoding, @temporary_directory) @rotation = rotation_strategy executor = Concurrent::ThreadPoolExecutor.new(min_threads: 1, max_threads: @upload_workers_count, max_queue: @upload_queue_size, fallback_policy: :caller_runs) @uploader = Uploader.new(blob_container_resource, container_name, @logger, executor) restore_from_crash if @restore start_periodic_check if @rotation.needs_periodic? end # Receives multiple events and check if there is space in temporary directory # @param events_and_encoded [Object] def multi_receive_encoded(events_and_encoded) prefix_written_to = Set.new events_and_encoded.each do |event, encoded| prefix_key = normalize_key(event.sprintf(@prefix)) prefix_written_to << prefix_key begin @file_repository.get_file(prefix_key) { |file| file.write(encoded) } # The output should stop accepting new events coming in, since it cannot do anything with them anymore. # Log the error and rethrow it. rescue Errno::ENOSPC => e @logger.error('Azure: No space left in temporary directory', temporary_directory: @temporary_directory) raise e end end # Groups IO calls to optimize fstat checks rotate_if_needed(prefix_written_to) end # close the temporary file and uploads the content to Azure def close stop_periodic_check if @rotation.needs_periodic? @logger.debug('Uploading current workspace') # The plugin has stopped receiving new events, but we still have # data on disk, lets make sure it get to Azure blob. # If Logstash get interrupted, the `restore_from_crash` (when set to true) method will pickup # the content in the temporary directly and upload it. # This will block the shutdown until all upload are done or the use force quit. @file_repository.each_files do |file| upload_file(file) end @file_repository.shutdown @uploader.stop # wait until all the current upload are complete @crash_uploader.stop if @restore # we might have still work to do for recovery so wait until we are done end # Validates and normalize prefix key # @param prefix_key [String] def normalize_key(prefix_key) prefix_key.gsub(PathValidator.matches_re, PREFIX_KEY_NORMALIZE_CHARACTER) end # checks periodically the tmeporary file if it needs to be rotated def start_periodic_check @logger.debug('Start periodic rotation check') @periodic_check = Concurrent::TimerTask.new(execution_interval: PERIODIC_CHECK_INTERVAL_IN_SECONDS) do @logger.debug('Periodic check for stale files') rotate_if_needed(@file_repository.keys) end @periodic_check.execute end def stop_periodic_check @periodic_check.shutdown end # login to azure cloud using azure gem and create the contianer if it doesn't exist # @return [Object] the azure_blob_service object, which is the endpoint to azure gem def blob_container_resource Azure.config.storage_account_name = storage_account_name Azure.config.storage_access_key = storage_access_key azure_blob_service = Azure::Blob::BlobService.new list = azure_blob_service.list_containers list.each do |item| @container = item if item.name == container_name end azure_blob_service.create_container(container_name) unless @container azure_blob_service end # check if it needs to rotate according to rotation policy and rotates it if it needs # @param prefixes [String] def rotate_if_needed(prefixes) prefixes.each do |prefix| # Each file access is thread safe, # until the rotation is done then only # one thread has access to the resource. @file_repository.get_factory(prefix) do |factory| temp_file = factory.current if @rotation.rotate?(temp_file) @logger.debug('Rotate file', strategy: @rotation.class.name, key: temp_file.key, path: temp_file.path) upload_file(temp_file) factory.rotate! end end end end # uploads the file using the +Uploader+ def upload_file(temp_file) @logger.debug('Queue for upload', path: temp_file.path) # if the queue is full the calling thread will be used to upload temp_file.close # make sure the content is on disk unless temp_file.empty? # rubocop:disable GuardClause @uploader.upload_async(temp_file, on_complete: method(:clean_temporary_file)) end end # creates an instance for the rotation strategy def rotation_strategy case @rotation_strategy_val when 'size' SizeRotationPolicy.new(size_file) when 'time' TimeRotationPolicy.new(time_file) when 'size_and_time' SizeAndTimeRotationPolicy.new(size_file, time_file) end end # Cleans the temporary files after it is uploaded to azure blob def clean_temporary_file(file) @logger.debug('Removing temporary file', file: file.path) file.delete! end # uploads files if there was a crash before def restore_from_crash @crash_uploader = Uploader.new(blob_container_resource, container_name, @logger, CRASH_RECOVERY_THREADPOOL) temp_folder_path = Pathname.new(@temporary_directory) Dir.glob(::File.join(@temporary_directory, '**/*')) .select { |file| ::File.file?(file) } .each do |file| temp_file = TemporaryFile.create_from_existing_file(file, temp_folder_path) @logger.debug('Recovering from crash and uploading', file: temp_file.path) @crash_uploader.upload_async(temp_file, on_complete: method(:clean_temporary_file)) end end end |
#upload_queue_size ⇒ Object
Returns the value of attribute upload_queue_size.
98 99 100 |
# File 'lib/logstash/outputs/azure.rb', line 98 def upload_queue_size @upload_queue_size end |
#upload_workers_count ⇒ Object
Returns the value of attribute upload_workers_count.
98 99 100 |
# File 'lib/logstash/outputs/azure.rb', line 98 def upload_workers_count @upload_workers_count end |
Instance Method Details
#blob_container_resource ⇒ Object
login to azure cloud using azure gem and create the contianer if it doesn’t exist
204 205 206 207 208 209 210 211 212 213 214 215 |
# File 'lib/logstash/outputs/azure.rb', line 204 def blob_container_resource Azure.config.storage_account_name = storage_account_name Azure.config.storage_access_key = storage_access_key azure_blob_service = Azure::Blob::BlobService.new list = azure_blob_service.list_containers list.each do |item| @container = item if item.name == container_name end azure_blob_service.create_container(container_name) unless @container azure_blob_service end |
#clean_temporary_file(file) ⇒ Object
Cleans the temporary files after it is uploaded to azure blob
265 266 267 268 |
# File 'lib/logstash/outputs/azure.rb', line 265 def clean_temporary_file(file) @logger.debug('Removing temporary file', file: file.path) file.delete! end |
#close ⇒ Object
close the temporary file and uploads the content to Azure
159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 |
# File 'lib/logstash/outputs/azure.rb', line 159 def close stop_periodic_check if @rotation.needs_periodic? @logger.debug('Uploading current workspace') # The plugin has stopped receiving new events, but we still have # data on disk, lets make sure it get to Azure blob. # If Logstash get interrupted, the `restore_from_crash` (when set to true) method will pickup # the content in the temporary directly and upload it. # This will block the shutdown until all upload are done or the use force quit. @file_repository.each_files do |file| upload_file(file) end @file_repository.shutdown @uploader.stop # wait until all the current upload are complete @crash_uploader.stop if @restore # we might have still work to do for recovery so wait until we are done end |
#multi_receive_encoded(events_and_encoded) ⇒ Object
Receives multiple events and check if there is space in temporary directory
137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 |
# File 'lib/logstash/outputs/azure.rb', line 137 def multi_receive_encoded(events_and_encoded) prefix_written_to = Set.new events_and_encoded.each do |event, encoded| prefix_key = normalize_key(event.sprintf(@prefix)) prefix_written_to << prefix_key begin @file_repository.get_file(prefix_key) { |file| file.write(encoded) } # The output should stop accepting new events coming in, since it cannot do anything with them anymore. # Log the error and rethrow it. rescue Errno::ENOSPC => e @logger.error('Azure: No space left in temporary directory', temporary_directory: @temporary_directory) raise e end end # Groups IO calls to optimize fstat checks rotate_if_needed(prefix_written_to) end |
#normalize_key(prefix_key) ⇒ Object
Validates and normalize prefix key
181 182 183 |
# File 'lib/logstash/outputs/azure.rb', line 181 def normalize_key(prefix_key) prefix_key.gsub(PathValidator.matches_re, PREFIX_KEY_NORMALIZE_CHARACTER) end |
#register ⇒ Object
initializes the LogstashAzureBlobOutput
instances validates all config parameters initializes the uploader
105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 |
# File 'lib/logstash/outputs/azure.rb', line 105 def register unless @prefix.empty? unless PathValidator.valid?(prefix) raise LogStash::ConfigurationError.new("Prefix must not contains: #{PathValidator::INVALID_CHARACTERS}") end end unless WritableDirectoryValidator.valid?(@temporary_directory) raise LogStash::ConfigurationError.new("Logstash must have the permissions to write to the temporary directory: #{@temporary_directory}") end if @time_file.nil? && @size_file.nil? || @size_file.zero? && @time_file.zero? raise LogStash::ConfigurationError.new('at least one of time_file or size_file set to a value greater than 0') end @file_repository = FileRepository.new(@tags, @encoding, @temporary_directory) @rotation = rotation_strategy executor = Concurrent::ThreadPoolExecutor.new(min_threads: 1, max_threads: @upload_workers_count, max_queue: @upload_queue_size, fallback_policy: :caller_runs) @uploader = Uploader.new(blob_container_resource, container_name, @logger, executor) restore_from_crash if @restore start_periodic_check if @rotation.needs_periodic? end |
#restore_from_crash ⇒ Object
uploads files if there was a crash before
271 272 273 274 275 276 277 278 279 280 281 282 |
# File 'lib/logstash/outputs/azure.rb', line 271 def restore_from_crash @crash_uploader = Uploader.new(blob_container_resource, container_name, @logger, CRASH_RECOVERY_THREADPOOL) temp_folder_path = Pathname.new(@temporary_directory) Dir.glob(::File.join(@temporary_directory, '**/*')) .select { |file| ::File.file?(file) } .each do |file| temp_file = TemporaryFile.create_from_existing_file(file, temp_folder_path) @logger.debug('Recovering from crash and uploading', file: temp_file.path) @crash_uploader.upload_async(temp_file, on_complete: method(:clean_temporary_file)) end end |
#rotate_if_needed(prefixes) ⇒ Object
check if it needs to rotate according to rotation policy and rotates it if it needs
219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 |
# File 'lib/logstash/outputs/azure.rb', line 219 def rotate_if_needed(prefixes) prefixes.each do |prefix| # Each file access is thread safe, # until the rotation is done then only # one thread has access to the resource. @file_repository.get_factory(prefix) do |factory| temp_file = factory.current if @rotation.rotate?(temp_file) @logger.debug('Rotate file', strategy: @rotation.class.name, key: temp_file.key, path: temp_file.path) upload_file(temp_file) factory.rotate! end end end end |
#rotation_strategy ⇒ Object
creates an instance for the rotation strategy
253 254 255 256 257 258 259 260 261 262 |
# File 'lib/logstash/outputs/azure.rb', line 253 def rotation_strategy case @rotation_strategy_val when 'size' SizeRotationPolicy.new(size_file) when 'time' TimeRotationPolicy.new(time_file) when 'size_and_time' SizeAndTimeRotationPolicy.new(size_file, time_file) end end |
#start_periodic_check ⇒ Object
checks periodically the tmeporary file if it needs to be rotated
186 187 188 189 190 191 192 193 194 195 196 |
# File 'lib/logstash/outputs/azure.rb', line 186 def start_periodic_check @logger.debug('Start periodic rotation check') @periodic_check = Concurrent::TimerTask.new(execution_interval: PERIODIC_CHECK_INTERVAL_IN_SECONDS) do @logger.debug('Periodic check for stale files') rotate_if_needed(@file_repository.keys) end @periodic_check.execute end |
#stop_periodic_check ⇒ Object
198 199 200 |
# File 'lib/logstash/outputs/azure.rb', line 198 def stop_periodic_check @periodic_check.shutdown end |
#upload_file(temp_file) ⇒ Object
uploads the file using the Uploader
241 242 243 244 245 246 247 248 249 250 |
# File 'lib/logstash/outputs/azure.rb', line 241 def upload_file(temp_file) @logger.debug('Queue for upload', path: temp_file.path) # if the queue is full the calling thread will be used to upload temp_file.close # make sure the content is on disk unless temp_file.empty? # rubocop:disable GuardClause @uploader.upload_async(temp_file, on_complete: method(:clean_temporary_file)) end end |