Class: LogStash::Outputs::LogstashAzureBlobOutput

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/outputs/azure.rb,
lib/logstash/outputs/blob/uploader.rb,
lib/logstash/outputs/blob/path_validator.rb,
lib/logstash/outputs/blob/temporary_file.rb,
lib/logstash/outputs/blob/file_repository.rb,
lib/logstash/outputs/blob/size_rotation_policy.rb,
lib/logstash/outputs/blob/time_rotation_policy.rb,
lib/logstash/outputs/blob/temporary_file_factory.rb,
lib/logstash/outputs/blob/writable_directory_validator.rb,
lib/logstash/outputs/blob/size_and_time_rotation_policy.rb

Overview

Logstash outout plugin that uploads the logs to Azure blobs. The logs are stored on local temporary file which is uploaded as a blob to Azure cloud

Examples:

basic configuration

output {
  azure {
    storage_account_name => "my-azure-account"    # required
    storage_access_key => "my-super-secret-key"   # required
    contianer_name => "my-contianer"              # required
    size_file => 1024*1024*5                      # optional
    time_file => 10                               # optional
    restore => true                               # optional
    temporary_directory => "path/to/directory"    # optional
    prefix => "a_prefix"                          # optional
    upload_queue_size => 2                        # optional
    upload_workers_count => 1                     # optional
    rotation_strategy_val => "size_and_time"      # optional
    tags => []                                    # optional
    encoding => "none"                            # optional
  }
}

Author:

  • Jaime Margolin

Defined Under Namespace

Classes: FileRepository, PathValidator, SizeAndTimeRotationPolicy, SizeRotationPolicy, TemporaryFile, TemporaryFileFactory, TimeRotationPolicy, Uploader, WritableDirectoryValidator

Constant Summary collapse

PREFIX_KEY_NORMALIZE_CHARACTER =
'_'.freeze
PERIODIC_CHECK_INTERVAL_IN_SECONDS =
15
CRASH_RECOVERY_THREADPOOL =
Concurrent::ThreadPoolExecutor.new(min_threads: 1,
max_threads: 2,
fallback_policy: :caller_runs)

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#container_nameObject

Returns the value of attribute container_name.



98
99
100
# File 'lib/logstash/outputs/azure.rb', line 98

def container_name
  @container_name
end

#contianer_nameObject

Blob container to uplaod blobs to (required)



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
# File 'lib/logstash/outputs/azure.rb', line 56

class LogStash::Outputs::LogstashAzureBlobOutput < LogStash::Outputs::Base
  # name for the namespace under output for logstash configuration
  config_name 'azure'
  default :codec, "line"

  require 'logstash/outputs/blob/writable_directory_validator'
  require 'logstash/outputs/blob/path_validator'
  require 'logstash/outputs/blob/size_rotation_policy'
  require 'logstash/outputs/blob/time_rotation_policy'
  require 'logstash/outputs/blob/size_and_time_rotation_policy'
  require 'logstash/outputs/blob/temporary_file'
  require 'logstash/outputs/blob/temporary_file_factory'
  require 'logstash/outputs/blob/uploader'
  require 'logstash/outputs/blob/file_repository'

  PREFIX_KEY_NORMALIZE_CHARACTER = '_'.freeze
  PERIODIC_CHECK_INTERVAL_IN_SECONDS = 15
  CRASH_RECOVERY_THREADPOOL = Concurrent::ThreadPoolExecutor.new(min_threads: 1,
                                                                 max_threads: 2,
                                                                 fallback_policy: :caller_runs)

  # azure contianer
  config :storage_account_name, validate: :string, required: false

  # azure key
  config :storage_access_key, validate: :string, required: false

  # conatainer name
  config :container_name, validate: :string, required: false

  # mamadas
  config :size_file, validate: :number, default: 1024 * 1024 * 5
  config :time_file, validate: :number, default: 15
  config :restore, validate: :boolean, default: true
  config :temporary_directory, validate: :string, default: File.join(Dir.tmpdir, 'logstash')
  config :prefix, validate: :string, default: ''
  config :upload_queue_size, validate: :number, default: 2 * (Concurrent.processor_count * 0.25).ceil
  config :upload_workers_count, validate: :number, default: (Concurrent.processor_count * 0.5).ceil
  config :rotation_strategy_val, validate: %w[size_and_time size time], default: 'size_and_time'
  config :tags, validate: :array, default: []
  config :encoding, validate: %w[none gzip], default: 'none'

  attr_accessor :storage_account_name, :storage_access_key, :container_name,
                :size_file, :time_file, :restore, :temporary_directory, :prefix, :upload_queue_size,
                :upload_workers_count, :rotation_strategy_val, :tags, :encoding

  # initializes the +LogstashAzureBlobOutput+ instances
  # validates all config parameters
  # initializes the uploader
  def register
    unless @prefix.empty?
      unless PathValidator.valid?(prefix)
        raise LogStash::ConfigurationError.new("Prefix must not contains: #{PathValidator::INVALID_CHARACTERS}")
      end
    end

    unless WritableDirectoryValidator.valid?(@temporary_directory)
      raise LogStash::ConfigurationError.new("Logstash must have the permissions to write to the temporary directory: #{@temporary_directory}")
    end

    if @time_file.nil? && @size_file.nil? || @size_file.zero? && @time_file.zero?
      raise LogStash::ConfigurationError.new('at least one of time_file or size_file set to a value greater than 0')
    end

    @file_repository = FileRepository.new(@tags, @encoding, @temporary_directory)

    @rotation = rotation_strategy

    executor = Concurrent::ThreadPoolExecutor.new(min_threads: 1,
                                                  max_threads: @upload_workers_count,
                                                  max_queue: @upload_queue_size,
                                                  fallback_policy: :caller_runs)

    @uploader = Uploader.new(blob_container_resource, container_name, @logger, executor)

    restore_from_crash if @restore
    start_periodic_check if @rotation.needs_periodic?
  end

  # Receives multiple events and check if there is space in temporary directory
  # @param events_and_encoded [Object]
  def multi_receive_encoded(events_and_encoded)
    prefix_written_to = Set.new

    events_and_encoded.each do |event, encoded|
      prefix_key = normalize_key(event.sprintf(@prefix))
      prefix_written_to << prefix_key

      begin
        @file_repository.get_file(prefix_key) { |file| file.write(encoded) }
        # The output should stop accepting new events coming in, since it cannot do anything with them anymore.
        # Log the error and rethrow it.
      rescue Errno::ENOSPC => e
        @logger.error('Azure: No space left in temporary directory', temporary_directory: @temporary_directory)
        raise e
      end
    end

    # Groups IO calls to optimize fstat checks
    rotate_if_needed(prefix_written_to)
  end

  # close the temporary file and uploads the content to Azure
  def close
    stop_periodic_check if @rotation.needs_periodic?

    @logger.debug('Uploading current workspace')

    # The plugin has stopped receiving new events, but we still have
    # data on disk, lets make sure it get to Azure blob.
    # If Logstash get interrupted, the `restore_from_crash` (when set to true) method will pickup
    # the content in the temporary directly and upload it.
    # This will block the shutdown until all upload are done or the use force quit.
    @file_repository.each_files do |file|
      upload_file(file)
    end

    @file_repository.shutdown

    @uploader.stop # wait until all the current upload are complete
    @crash_uploader.stop if @restore # we might have still work to do for recovery so wait until we are done
  end

  # Validates and normalize prefix key
  # @param prefix_key [String]
  def normalize_key(prefix_key)
    prefix_key.gsub(PathValidator.matches_re, PREFIX_KEY_NORMALIZE_CHARACTER)
  end

  # checks periodically the tmeporary file if it needs to be rotated
  def start_periodic_check
    @logger.debug('Start periodic rotation check')

    @periodic_check = Concurrent::TimerTask.new(execution_interval: PERIODIC_CHECK_INTERVAL_IN_SECONDS) do
      @logger.debug('Periodic check for stale files')

      rotate_if_needed(@file_repository.keys)
    end

    @periodic_check.execute
  end

  def stop_periodic_check
    @periodic_check.shutdown
  end

  # login to azure cloud using azure gem and create the contianer if it doesn't exist
  # @return [Object] the azure_blob_service object, which is the endpoint to azure gem
  def blob_container_resource
    Azure.config. = 
    Azure.config.storage_access_key = storage_access_key
    azure_blob_service = Azure::Blob::BlobService.new
    list = azure_blob_service.list_containers
    list.each do |item|
      @container = item if item.name == container_name
    end

    azure_blob_service.create_container(container_name) unless @container
    azure_blob_service
  end

  # check if it needs to rotate according to rotation policy and rotates it if it needs
  # @param prefixes [String]
  def rotate_if_needed(prefixes)
    prefixes.each do |prefix|
      # Each file access is thread safe,
      # until the rotation is done then only
      # one thread has access to the resource.
      @file_repository.get_factory(prefix) do |factory|
        temp_file = factory.current

        if @rotation.rotate?(temp_file)
          @logger.debug('Rotate file',
                        strategy: @rotation.class.name,
                        key: temp_file.key,
                        path: temp_file.path)

          upload_file(temp_file)
          factory.rotate!
        end
      end
    end
  end

  # uploads the file using the +Uploader+
  def upload_file(temp_file)
    @logger.debug('Queue for upload', path: temp_file.path)

    # if the queue is full the calling thread will be used to upload
    temp_file.close # make sure the content is on disk
    unless temp_file.empty? # rubocop:disable GuardClause
      @uploader.upload_async(temp_file,
                             on_complete: method(:clean_temporary_file))
    end
  end

  # creates an instance for the rotation strategy
  def rotation_strategy
    case @rotation_strategy_val
    when 'size'
      SizeRotationPolicy.new(size_file)
    when 'time'
      TimeRotationPolicy.new(time_file)
    when 'size_and_time'
      SizeAndTimeRotationPolicy.new(size_file, time_file)
    end
  end

  # Cleans the temporary files after it is uploaded to azure blob
  def clean_temporary_file(file)
    @logger.debug('Removing temporary file', file: file.path)
    file.delete!
  end

  # uploads files if there was a crash before
  def restore_from_crash
    @crash_uploader = Uploader.new(blob_container_resource, container_name, @logger, CRASH_RECOVERY_THREADPOOL)

    temp_folder_path = Pathname.new(@temporary_directory)
    Dir.glob(::File.join(@temporary_directory, '**/*'))
       .select { |file| ::File.file?(file) }
       .each do |file|
      temp_file = TemporaryFile.create_from_existing_file(file, temp_folder_path)
      @logger.debug('Recovering from crash and uploading', file: temp_file.path)
      @crash_uploader.upload_async(temp_file, on_complete: method(:clean_temporary_file))
    end
  end
end

#encodingObject

the encoding of the files



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
# File 'lib/logstash/outputs/azure.rb', line 56

class LogStash::Outputs::LogstashAzureBlobOutput < LogStash::Outputs::Base
  # name for the namespace under output for logstash configuration
  config_name 'azure'
  default :codec, "line"

  require 'logstash/outputs/blob/writable_directory_validator'
  require 'logstash/outputs/blob/path_validator'
  require 'logstash/outputs/blob/size_rotation_policy'
  require 'logstash/outputs/blob/time_rotation_policy'
  require 'logstash/outputs/blob/size_and_time_rotation_policy'
  require 'logstash/outputs/blob/temporary_file'
  require 'logstash/outputs/blob/temporary_file_factory'
  require 'logstash/outputs/blob/uploader'
  require 'logstash/outputs/blob/file_repository'

  PREFIX_KEY_NORMALIZE_CHARACTER = '_'.freeze
  PERIODIC_CHECK_INTERVAL_IN_SECONDS = 15
  CRASH_RECOVERY_THREADPOOL = Concurrent::ThreadPoolExecutor.new(min_threads: 1,
                                                                 max_threads: 2,
                                                                 fallback_policy: :caller_runs)

  # azure contianer
  config :storage_account_name, validate: :string, required: false

  # azure key
  config :storage_access_key, validate: :string, required: false

  # conatainer name
  config :container_name, validate: :string, required: false

  # mamadas
  config :size_file, validate: :number, default: 1024 * 1024 * 5
  config :time_file, validate: :number, default: 15
  config :restore, validate: :boolean, default: true
  config :temporary_directory, validate: :string, default: File.join(Dir.tmpdir, 'logstash')
  config :prefix, validate: :string, default: ''
  config :upload_queue_size, validate: :number, default: 2 * (Concurrent.processor_count * 0.25).ceil
  config :upload_workers_count, validate: :number, default: (Concurrent.processor_count * 0.5).ceil
  config :rotation_strategy_val, validate: %w[size_and_time size time], default: 'size_and_time'
  config :tags, validate: :array, default: []
  config :encoding, validate: %w[none gzip], default: 'none'

  attr_accessor :storage_account_name, :storage_access_key, :container_name,
                :size_file, :time_file, :restore, :temporary_directory, :prefix, :upload_queue_size,
                :upload_workers_count, :rotation_strategy_val, :tags, :encoding

  # initializes the +LogstashAzureBlobOutput+ instances
  # validates all config parameters
  # initializes the uploader
  def register
    unless @prefix.empty?
      unless PathValidator.valid?(prefix)
        raise LogStash::ConfigurationError.new("Prefix must not contains: #{PathValidator::INVALID_CHARACTERS}")
      end
    end

    unless WritableDirectoryValidator.valid?(@temporary_directory)
      raise LogStash::ConfigurationError.new("Logstash must have the permissions to write to the temporary directory: #{@temporary_directory}")
    end

    if @time_file.nil? && @size_file.nil? || @size_file.zero? && @time_file.zero?
      raise LogStash::ConfigurationError.new('at least one of time_file or size_file set to a value greater than 0')
    end

    @file_repository = FileRepository.new(@tags, @encoding, @temporary_directory)

    @rotation = rotation_strategy

    executor = Concurrent::ThreadPoolExecutor.new(min_threads: 1,
                                                  max_threads: @upload_workers_count,
                                                  max_queue: @upload_queue_size,
                                                  fallback_policy: :caller_runs)

    @uploader = Uploader.new(blob_container_resource, container_name, @logger, executor)

    restore_from_crash if @restore
    start_periodic_check if @rotation.needs_periodic?
  end

  # Receives multiple events and check if there is space in temporary directory
  # @param events_and_encoded [Object]
  def multi_receive_encoded(events_and_encoded)
    prefix_written_to = Set.new

    events_and_encoded.each do |event, encoded|
      prefix_key = normalize_key(event.sprintf(@prefix))
      prefix_written_to << prefix_key

      begin
        @file_repository.get_file(prefix_key) { |file| file.write(encoded) }
        # The output should stop accepting new events coming in, since it cannot do anything with them anymore.
        # Log the error and rethrow it.
      rescue Errno::ENOSPC => e
        @logger.error('Azure: No space left in temporary directory', temporary_directory: @temporary_directory)
        raise e
      end
    end

    # Groups IO calls to optimize fstat checks
    rotate_if_needed(prefix_written_to)
  end

  # close the temporary file and uploads the content to Azure
  def close
    stop_periodic_check if @rotation.needs_periodic?

    @logger.debug('Uploading current workspace')

    # The plugin has stopped receiving new events, but we still have
    # data on disk, lets make sure it get to Azure blob.
    # If Logstash get interrupted, the `restore_from_crash` (when set to true) method will pickup
    # the content in the temporary directly and upload it.
    # This will block the shutdown until all upload are done or the use force quit.
    @file_repository.each_files do |file|
      upload_file(file)
    end

    @file_repository.shutdown

    @uploader.stop # wait until all the current upload are complete
    @crash_uploader.stop if @restore # we might have still work to do for recovery so wait until we are done
  end

  # Validates and normalize prefix key
  # @param prefix_key [String]
  def normalize_key(prefix_key)
    prefix_key.gsub(PathValidator.matches_re, PREFIX_KEY_NORMALIZE_CHARACTER)
  end

  # checks periodically the tmeporary file if it needs to be rotated
  def start_periodic_check
    @logger.debug('Start periodic rotation check')

    @periodic_check = Concurrent::TimerTask.new(execution_interval: PERIODIC_CHECK_INTERVAL_IN_SECONDS) do
      @logger.debug('Periodic check for stale files')

      rotate_if_needed(@file_repository.keys)
    end

    @periodic_check.execute
  end

  def stop_periodic_check
    @periodic_check.shutdown
  end

  # login to azure cloud using azure gem and create the contianer if it doesn't exist
  # @return [Object] the azure_blob_service object, which is the endpoint to azure gem
  def blob_container_resource
    Azure.config. = 
    Azure.config.storage_access_key = storage_access_key
    azure_blob_service = Azure::Blob::BlobService.new
    list = azure_blob_service.list_containers
    list.each do |item|
      @container = item if item.name == container_name
    end

    azure_blob_service.create_container(container_name) unless @container
    azure_blob_service
  end

  # check if it needs to rotate according to rotation policy and rotates it if it needs
  # @param prefixes [String]
  def rotate_if_needed(prefixes)
    prefixes.each do |prefix|
      # Each file access is thread safe,
      # until the rotation is done then only
      # one thread has access to the resource.
      @file_repository.get_factory(prefix) do |factory|
        temp_file = factory.current

        if @rotation.rotate?(temp_file)
          @logger.debug('Rotate file',
                        strategy: @rotation.class.name,
                        key: temp_file.key,
                        path: temp_file.path)

          upload_file(temp_file)
          factory.rotate!
        end
      end
    end
  end

  # uploads the file using the +Uploader+
  def upload_file(temp_file)
    @logger.debug('Queue for upload', path: temp_file.path)

    # if the queue is full the calling thread will be used to upload
    temp_file.close # make sure the content is on disk
    unless temp_file.empty? # rubocop:disable GuardClause
      @uploader.upload_async(temp_file,
                             on_complete: method(:clean_temporary_file))
    end
  end

  # creates an instance for the rotation strategy
  def rotation_strategy
    case @rotation_strategy_val
    when 'size'
      SizeRotationPolicy.new(size_file)
    when 'time'
      TimeRotationPolicy.new(time_file)
    when 'size_and_time'
      SizeAndTimeRotationPolicy.new(size_file, time_file)
    end
  end

  # Cleans the temporary files after it is uploaded to azure blob
  def clean_temporary_file(file)
    @logger.debug('Removing temporary file', file: file.path)
    file.delete!
  end

  # uploads files if there was a crash before
  def restore_from_crash
    @crash_uploader = Uploader.new(blob_container_resource, container_name, @logger, CRASH_RECOVERY_THREADPOOL)

    temp_folder_path = Pathname.new(@temporary_directory)
    Dir.glob(::File.join(@temporary_directory, '**/*'))
       .select { |file| ::File.file?(file) }
       .each do |file|
      temp_file = TemporaryFile.create_from_existing_file(file, temp_folder_path)
      @logger.debug('Recovering from crash and uploading', file: temp_file.path)
      @crash_uploader.upload_async(temp_file, on_complete: method(:clean_temporary_file))
    end
  end
end

#prefixObject

prefix for the files to be uploaded



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
# File 'lib/logstash/outputs/azure.rb', line 56

class LogStash::Outputs::LogstashAzureBlobOutput < LogStash::Outputs::Base
  # name for the namespace under output for logstash configuration
  config_name 'azure'
  default :codec, "line"

  require 'logstash/outputs/blob/writable_directory_validator'
  require 'logstash/outputs/blob/path_validator'
  require 'logstash/outputs/blob/size_rotation_policy'
  require 'logstash/outputs/blob/time_rotation_policy'
  require 'logstash/outputs/blob/size_and_time_rotation_policy'
  require 'logstash/outputs/blob/temporary_file'
  require 'logstash/outputs/blob/temporary_file_factory'
  require 'logstash/outputs/blob/uploader'
  require 'logstash/outputs/blob/file_repository'

  PREFIX_KEY_NORMALIZE_CHARACTER = '_'.freeze
  PERIODIC_CHECK_INTERVAL_IN_SECONDS = 15
  CRASH_RECOVERY_THREADPOOL = Concurrent::ThreadPoolExecutor.new(min_threads: 1,
                                                                 max_threads: 2,
                                                                 fallback_policy: :caller_runs)

  # azure contianer
  config :storage_account_name, validate: :string, required: false

  # azure key
  config :storage_access_key, validate: :string, required: false

  # conatainer name
  config :container_name, validate: :string, required: false

  # mamadas
  config :size_file, validate: :number, default: 1024 * 1024 * 5
  config :time_file, validate: :number, default: 15
  config :restore, validate: :boolean, default: true
  config :temporary_directory, validate: :string, default: File.join(Dir.tmpdir, 'logstash')
  config :prefix, validate: :string, default: ''
  config :upload_queue_size, validate: :number, default: 2 * (Concurrent.processor_count * 0.25).ceil
  config :upload_workers_count, validate: :number, default: (Concurrent.processor_count * 0.5).ceil
  config :rotation_strategy_val, validate: %w[size_and_time size time], default: 'size_and_time'
  config :tags, validate: :array, default: []
  config :encoding, validate: %w[none gzip], default: 'none'

  attr_accessor :storage_account_name, :storage_access_key, :container_name,
                :size_file, :time_file, :restore, :temporary_directory, :prefix, :upload_queue_size,
                :upload_workers_count, :rotation_strategy_val, :tags, :encoding

  # initializes the +LogstashAzureBlobOutput+ instances
  # validates all config parameters
  # initializes the uploader
  def register
    unless @prefix.empty?
      unless PathValidator.valid?(prefix)
        raise LogStash::ConfigurationError.new("Prefix must not contains: #{PathValidator::INVALID_CHARACTERS}")
      end
    end

    unless WritableDirectoryValidator.valid?(@temporary_directory)
      raise LogStash::ConfigurationError.new("Logstash must have the permissions to write to the temporary directory: #{@temporary_directory}")
    end

    if @time_file.nil? && @size_file.nil? || @size_file.zero? && @time_file.zero?
      raise LogStash::ConfigurationError.new('at least one of time_file or size_file set to a value greater than 0')
    end

    @file_repository = FileRepository.new(@tags, @encoding, @temporary_directory)

    @rotation = rotation_strategy

    executor = Concurrent::ThreadPoolExecutor.new(min_threads: 1,
                                                  max_threads: @upload_workers_count,
                                                  max_queue: @upload_queue_size,
                                                  fallback_policy: :caller_runs)

    @uploader = Uploader.new(blob_container_resource, container_name, @logger, executor)

    restore_from_crash if @restore
    start_periodic_check if @rotation.needs_periodic?
  end

  # Receives multiple events and check if there is space in temporary directory
  # @param events_and_encoded [Object]
  def multi_receive_encoded(events_and_encoded)
    prefix_written_to = Set.new

    events_and_encoded.each do |event, encoded|
      prefix_key = normalize_key(event.sprintf(@prefix))
      prefix_written_to << prefix_key

      begin
        @file_repository.get_file(prefix_key) { |file| file.write(encoded) }
        # The output should stop accepting new events coming in, since it cannot do anything with them anymore.
        # Log the error and rethrow it.
      rescue Errno::ENOSPC => e
        @logger.error('Azure: No space left in temporary directory', temporary_directory: @temporary_directory)
        raise e
      end
    end

    # Groups IO calls to optimize fstat checks
    rotate_if_needed(prefix_written_to)
  end

  # close the temporary file and uploads the content to Azure
  def close
    stop_periodic_check if @rotation.needs_periodic?

    @logger.debug('Uploading current workspace')

    # The plugin has stopped receiving new events, but we still have
    # data on disk, lets make sure it get to Azure blob.
    # If Logstash get interrupted, the `restore_from_crash` (when set to true) method will pickup
    # the content in the temporary directly and upload it.
    # This will block the shutdown until all upload are done or the use force quit.
    @file_repository.each_files do |file|
      upload_file(file)
    end

    @file_repository.shutdown

    @uploader.stop # wait until all the current upload are complete
    @crash_uploader.stop if @restore # we might have still work to do for recovery so wait until we are done
  end

  # Validates and normalize prefix key
  # @param prefix_key [String]
  def normalize_key(prefix_key)
    prefix_key.gsub(PathValidator.matches_re, PREFIX_KEY_NORMALIZE_CHARACTER)
  end

  # checks periodically the tmeporary file if it needs to be rotated
  def start_periodic_check
    @logger.debug('Start periodic rotation check')

    @periodic_check = Concurrent::TimerTask.new(execution_interval: PERIODIC_CHECK_INTERVAL_IN_SECONDS) do
      @logger.debug('Periodic check for stale files')

      rotate_if_needed(@file_repository.keys)
    end

    @periodic_check.execute
  end

  def stop_periodic_check
    @periodic_check.shutdown
  end

  # login to azure cloud using azure gem and create the contianer if it doesn't exist
  # @return [Object] the azure_blob_service object, which is the endpoint to azure gem
  def blob_container_resource
    Azure.config. = 
    Azure.config.storage_access_key = storage_access_key
    azure_blob_service = Azure::Blob::BlobService.new
    list = azure_blob_service.list_containers
    list.each do |item|
      @container = item if item.name == container_name
    end

    azure_blob_service.create_container(container_name) unless @container
    azure_blob_service
  end

  # check if it needs to rotate according to rotation policy and rotates it if it needs
  # @param prefixes [String]
  def rotate_if_needed(prefixes)
    prefixes.each do |prefix|
      # Each file access is thread safe,
      # until the rotation is done then only
      # one thread has access to the resource.
      @file_repository.get_factory(prefix) do |factory|
        temp_file = factory.current

        if @rotation.rotate?(temp_file)
          @logger.debug('Rotate file',
                        strategy: @rotation.class.name,
                        key: temp_file.key,
                        path: temp_file.path)

          upload_file(temp_file)
          factory.rotate!
        end
      end
    end
  end

  # uploads the file using the +Uploader+
  def upload_file(temp_file)
    @logger.debug('Queue for upload', path: temp_file.path)

    # if the queue is full the calling thread will be used to upload
    temp_file.close # make sure the content is on disk
    unless temp_file.empty? # rubocop:disable GuardClause
      @uploader.upload_async(temp_file,
                             on_complete: method(:clean_temporary_file))
    end
  end

  # creates an instance for the rotation strategy
  def rotation_strategy
    case @rotation_strategy_val
    when 'size'
      SizeRotationPolicy.new(size_file)
    when 'time'
      TimeRotationPolicy.new(time_file)
    when 'size_and_time'
      SizeAndTimeRotationPolicy.new(size_file, time_file)
    end
  end

  # Cleans the temporary files after it is uploaded to azure blob
  def clean_temporary_file(file)
    @logger.debug('Removing temporary file', file: file.path)
    file.delete!
  end

  # uploads files if there was a crash before
  def restore_from_crash
    @crash_uploader = Uploader.new(blob_container_resource, container_name, @logger, CRASH_RECOVERY_THREADPOOL)

    temp_folder_path = Pathname.new(@temporary_directory)
    Dir.glob(::File.join(@temporary_directory, '**/*'))
       .select { |file| ::File.file?(file) }
       .each do |file|
      temp_file = TemporaryFile.create_from_existing_file(file, temp_folder_path)
      @logger.debug('Recovering from crash and uploading', file: temp_file.path)
      @crash_uploader.upload_async(temp_file, on_complete: method(:clean_temporary_file))
    end
  end
end

#restoreObject

restore after crash



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
# File 'lib/logstash/outputs/azure.rb', line 56

class LogStash::Outputs::LogstashAzureBlobOutput < LogStash::Outputs::Base
  # name for the namespace under output for logstash configuration
  config_name 'azure'
  default :codec, "line"

  require 'logstash/outputs/blob/writable_directory_validator'
  require 'logstash/outputs/blob/path_validator'
  require 'logstash/outputs/blob/size_rotation_policy'
  require 'logstash/outputs/blob/time_rotation_policy'
  require 'logstash/outputs/blob/size_and_time_rotation_policy'
  require 'logstash/outputs/blob/temporary_file'
  require 'logstash/outputs/blob/temporary_file_factory'
  require 'logstash/outputs/blob/uploader'
  require 'logstash/outputs/blob/file_repository'

  PREFIX_KEY_NORMALIZE_CHARACTER = '_'.freeze
  PERIODIC_CHECK_INTERVAL_IN_SECONDS = 15
  CRASH_RECOVERY_THREADPOOL = Concurrent::ThreadPoolExecutor.new(min_threads: 1,
                                                                 max_threads: 2,
                                                                 fallback_policy: :caller_runs)

  # azure contianer
  config :storage_account_name, validate: :string, required: false

  # azure key
  config :storage_access_key, validate: :string, required: false

  # conatainer name
  config :container_name, validate: :string, required: false

  # mamadas
  config :size_file, validate: :number, default: 1024 * 1024 * 5
  config :time_file, validate: :number, default: 15
  config :restore, validate: :boolean, default: true
  config :temporary_directory, validate: :string, default: File.join(Dir.tmpdir, 'logstash')
  config :prefix, validate: :string, default: ''
  config :upload_queue_size, validate: :number, default: 2 * (Concurrent.processor_count * 0.25).ceil
  config :upload_workers_count, validate: :number, default: (Concurrent.processor_count * 0.5).ceil
  config :rotation_strategy_val, validate: %w[size_and_time size time], default: 'size_and_time'
  config :tags, validate: :array, default: []
  config :encoding, validate: %w[none gzip], default: 'none'

  attr_accessor :storage_account_name, :storage_access_key, :container_name,
                :size_file, :time_file, :restore, :temporary_directory, :prefix, :upload_queue_size,
                :upload_workers_count, :rotation_strategy_val, :tags, :encoding

  # initializes the +LogstashAzureBlobOutput+ instances
  # validates all config parameters
  # initializes the uploader
  def register
    unless @prefix.empty?
      unless PathValidator.valid?(prefix)
        raise LogStash::ConfigurationError.new("Prefix must not contains: #{PathValidator::INVALID_CHARACTERS}")
      end
    end

    unless WritableDirectoryValidator.valid?(@temporary_directory)
      raise LogStash::ConfigurationError.new("Logstash must have the permissions to write to the temporary directory: #{@temporary_directory}")
    end

    if @time_file.nil? && @size_file.nil? || @size_file.zero? && @time_file.zero?
      raise LogStash::ConfigurationError.new('at least one of time_file or size_file set to a value greater than 0')
    end

    @file_repository = FileRepository.new(@tags, @encoding, @temporary_directory)

    @rotation = rotation_strategy

    executor = Concurrent::ThreadPoolExecutor.new(min_threads: 1,
                                                  max_threads: @upload_workers_count,
                                                  max_queue: @upload_queue_size,
                                                  fallback_policy: :caller_runs)

    @uploader = Uploader.new(blob_container_resource, container_name, @logger, executor)

    restore_from_crash if @restore
    start_periodic_check if @rotation.needs_periodic?
  end

  # Receives multiple events and check if there is space in temporary directory
  # @param events_and_encoded [Object]
  def multi_receive_encoded(events_and_encoded)
    prefix_written_to = Set.new

    events_and_encoded.each do |event, encoded|
      prefix_key = normalize_key(event.sprintf(@prefix))
      prefix_written_to << prefix_key

      begin
        @file_repository.get_file(prefix_key) { |file| file.write(encoded) }
        # The output should stop accepting new events coming in, since it cannot do anything with them anymore.
        # Log the error and rethrow it.
      rescue Errno::ENOSPC => e
        @logger.error('Azure: No space left in temporary directory', temporary_directory: @temporary_directory)
        raise e
      end
    end

    # Groups IO calls to optimize fstat checks
    rotate_if_needed(prefix_written_to)
  end

  # close the temporary file and uploads the content to Azure
  def close
    stop_periodic_check if @rotation.needs_periodic?

    @logger.debug('Uploading current workspace')

    # The plugin has stopped receiving new events, but we still have
    # data on disk, lets make sure it get to Azure blob.
    # If Logstash get interrupted, the `restore_from_crash` (when set to true) method will pickup
    # the content in the temporary directly and upload it.
    # This will block the shutdown until all upload are done or the use force quit.
    @file_repository.each_files do |file|
      upload_file(file)
    end

    @file_repository.shutdown

    @uploader.stop # wait until all the current upload are complete
    @crash_uploader.stop if @restore # we might have still work to do for recovery so wait until we are done
  end

  # Validates and normalize prefix key
  # @param prefix_key [String]
  def normalize_key(prefix_key)
    prefix_key.gsub(PathValidator.matches_re, PREFIX_KEY_NORMALIZE_CHARACTER)
  end

  # checks periodically the tmeporary file if it needs to be rotated
  def start_periodic_check
    @logger.debug('Start periodic rotation check')

    @periodic_check = Concurrent::TimerTask.new(execution_interval: PERIODIC_CHECK_INTERVAL_IN_SECONDS) do
      @logger.debug('Periodic check for stale files')

      rotate_if_needed(@file_repository.keys)
    end

    @periodic_check.execute
  end

  def stop_periodic_check
    @periodic_check.shutdown
  end

  # login to azure cloud using azure gem and create the contianer if it doesn't exist
  # @return [Object] the azure_blob_service object, which is the endpoint to azure gem
  def blob_container_resource
    Azure.config. = 
    Azure.config.storage_access_key = storage_access_key
    azure_blob_service = Azure::Blob::BlobService.new
    list = azure_blob_service.list_containers
    list.each do |item|
      @container = item if item.name == container_name
    end

    azure_blob_service.create_container(container_name) unless @container
    azure_blob_service
  end

  # check if it needs to rotate according to rotation policy and rotates it if it needs
  # @param prefixes [String]
  def rotate_if_needed(prefixes)
    prefixes.each do |prefix|
      # Each file access is thread safe,
      # until the rotation is done then only
      # one thread has access to the resource.
      @file_repository.get_factory(prefix) do |factory|
        temp_file = factory.current

        if @rotation.rotate?(temp_file)
          @logger.debug('Rotate file',
                        strategy: @rotation.class.name,
                        key: temp_file.key,
                        path: temp_file.path)

          upload_file(temp_file)
          factory.rotate!
        end
      end
    end
  end

  # uploads the file using the +Uploader+
  def upload_file(temp_file)
    @logger.debug('Queue for upload', path: temp_file.path)

    # if the queue is full the calling thread will be used to upload
    temp_file.close # make sure the content is on disk
    unless temp_file.empty? # rubocop:disable GuardClause
      @uploader.upload_async(temp_file,
                             on_complete: method(:clean_temporary_file))
    end
  end

  # creates an instance for the rotation strategy
  def rotation_strategy
    case @rotation_strategy_val
    when 'size'
      SizeRotationPolicy.new(size_file)
    when 'time'
      TimeRotationPolicy.new(time_file)
    when 'size_and_time'
      SizeAndTimeRotationPolicy.new(size_file, time_file)
    end
  end

  # Cleans the temporary files after it is uploaded to azure blob
  def clean_temporary_file(file)
    @logger.debug('Removing temporary file', file: file.path)
    file.delete!
  end

  # uploads files if there was a crash before
  def restore_from_crash
    @crash_uploader = Uploader.new(blob_container_resource, container_name, @logger, CRASH_RECOVERY_THREADPOOL)

    temp_folder_path = Pathname.new(@temporary_directory)
    Dir.glob(::File.join(@temporary_directory, '**/*'))
       .select { |file| ::File.file?(file) }
       .each do |file|
      temp_file = TemporaryFile.create_from_existing_file(file, temp_folder_path)
      @logger.debug('Recovering from crash and uploading', file: temp_file.path)
      @crash_uploader.upload_async(temp_file, on_complete: method(:clean_temporary_file))
    end
  end
end

#rotation_strategy_valObject

what will be considered to do the tmeporary file rotation



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
# File 'lib/logstash/outputs/azure.rb', line 56

class LogStash::Outputs::LogstashAzureBlobOutput < LogStash::Outputs::Base
  # name for the namespace under output for logstash configuration
  config_name 'azure'
  default :codec, "line"

  require 'logstash/outputs/blob/writable_directory_validator'
  require 'logstash/outputs/blob/path_validator'
  require 'logstash/outputs/blob/size_rotation_policy'
  require 'logstash/outputs/blob/time_rotation_policy'
  require 'logstash/outputs/blob/size_and_time_rotation_policy'
  require 'logstash/outputs/blob/temporary_file'
  require 'logstash/outputs/blob/temporary_file_factory'
  require 'logstash/outputs/blob/uploader'
  require 'logstash/outputs/blob/file_repository'

  PREFIX_KEY_NORMALIZE_CHARACTER = '_'.freeze
  PERIODIC_CHECK_INTERVAL_IN_SECONDS = 15
  CRASH_RECOVERY_THREADPOOL = Concurrent::ThreadPoolExecutor.new(min_threads: 1,
                                                                 max_threads: 2,
                                                                 fallback_policy: :caller_runs)

  # azure contianer
  config :storage_account_name, validate: :string, required: false

  # azure key
  config :storage_access_key, validate: :string, required: false

  # conatainer name
  config :container_name, validate: :string, required: false

  # mamadas
  config :size_file, validate: :number, default: 1024 * 1024 * 5
  config :time_file, validate: :number, default: 15
  config :restore, validate: :boolean, default: true
  config :temporary_directory, validate: :string, default: File.join(Dir.tmpdir, 'logstash')
  config :prefix, validate: :string, default: ''
  config :upload_queue_size, validate: :number, default: 2 * (Concurrent.processor_count * 0.25).ceil
  config :upload_workers_count, validate: :number, default: (Concurrent.processor_count * 0.5).ceil
  config :rotation_strategy_val, validate: %w[size_and_time size time], default: 'size_and_time'
  config :tags, validate: :array, default: []
  config :encoding, validate: %w[none gzip], default: 'none'

  attr_accessor :storage_account_name, :storage_access_key, :container_name,
                :size_file, :time_file, :restore, :temporary_directory, :prefix, :upload_queue_size,
                :upload_workers_count, :rotation_strategy_val, :tags, :encoding

  # initializes the +LogstashAzureBlobOutput+ instances
  # validates all config parameters
  # initializes the uploader
  def register
    unless @prefix.empty?
      unless PathValidator.valid?(prefix)
        raise LogStash::ConfigurationError.new("Prefix must not contains: #{PathValidator::INVALID_CHARACTERS}")
      end
    end

    unless WritableDirectoryValidator.valid?(@temporary_directory)
      raise LogStash::ConfigurationError.new("Logstash must have the permissions to write to the temporary directory: #{@temporary_directory}")
    end

    if @time_file.nil? && @size_file.nil? || @size_file.zero? && @time_file.zero?
      raise LogStash::ConfigurationError.new('at least one of time_file or size_file set to a value greater than 0')
    end

    @file_repository = FileRepository.new(@tags, @encoding, @temporary_directory)

    @rotation = rotation_strategy

    executor = Concurrent::ThreadPoolExecutor.new(min_threads: 1,
                                                  max_threads: @upload_workers_count,
                                                  max_queue: @upload_queue_size,
                                                  fallback_policy: :caller_runs)

    @uploader = Uploader.new(blob_container_resource, container_name, @logger, executor)

    restore_from_crash if @restore
    start_periodic_check if @rotation.needs_periodic?
  end

  # Receives multiple events and check if there is space in temporary directory
  # @param events_and_encoded [Object]
  def multi_receive_encoded(events_and_encoded)
    prefix_written_to = Set.new

    events_and_encoded.each do |event, encoded|
      prefix_key = normalize_key(event.sprintf(@prefix))
      prefix_written_to << prefix_key

      begin
        @file_repository.get_file(prefix_key) { |file| file.write(encoded) }
        # The output should stop accepting new events coming in, since it cannot do anything with them anymore.
        # Log the error and rethrow it.
      rescue Errno::ENOSPC => e
        @logger.error('Azure: No space left in temporary directory', temporary_directory: @temporary_directory)
        raise e
      end
    end

    # Groups IO calls to optimize fstat checks
    rotate_if_needed(prefix_written_to)
  end

  # close the temporary file and uploads the content to Azure
  def close
    stop_periodic_check if @rotation.needs_periodic?

    @logger.debug('Uploading current workspace')

    # The plugin has stopped receiving new events, but we still have
    # data on disk, lets make sure it get to Azure blob.
    # If Logstash get interrupted, the `restore_from_crash` (when set to true) method will pickup
    # the content in the temporary directly and upload it.
    # This will block the shutdown until all upload are done or the use force quit.
    @file_repository.each_files do |file|
      upload_file(file)
    end

    @file_repository.shutdown

    @uploader.stop # wait until all the current upload are complete
    @crash_uploader.stop if @restore # we might have still work to do for recovery so wait until we are done
  end

  # Validates and normalize prefix key
  # @param prefix_key [String]
  def normalize_key(prefix_key)
    prefix_key.gsub(PathValidator.matches_re, PREFIX_KEY_NORMALIZE_CHARACTER)
  end

  # checks periodically the tmeporary file if it needs to be rotated
  def start_periodic_check
    @logger.debug('Start periodic rotation check')

    @periodic_check = Concurrent::TimerTask.new(execution_interval: PERIODIC_CHECK_INTERVAL_IN_SECONDS) do
      @logger.debug('Periodic check for stale files')

      rotate_if_needed(@file_repository.keys)
    end

    @periodic_check.execute
  end

  def stop_periodic_check
    @periodic_check.shutdown
  end

  # login to azure cloud using azure gem and create the contianer if it doesn't exist
  # @return [Object] the azure_blob_service object, which is the endpoint to azure gem
  def blob_container_resource
    Azure.config. = 
    Azure.config.storage_access_key = storage_access_key
    azure_blob_service = Azure::Blob::BlobService.new
    list = azure_blob_service.list_containers
    list.each do |item|
      @container = item if item.name == container_name
    end

    azure_blob_service.create_container(container_name) unless @container
    azure_blob_service
  end

  # check if it needs to rotate according to rotation policy and rotates it if it needs
  # @param prefixes [String]
  def rotate_if_needed(prefixes)
    prefixes.each do |prefix|
      # Each file access is thread safe,
      # until the rotation is done then only
      # one thread has access to the resource.
      @file_repository.get_factory(prefix) do |factory|
        temp_file = factory.current

        if @rotation.rotate?(temp_file)
          @logger.debug('Rotate file',
                        strategy: @rotation.class.name,
                        key: temp_file.key,
                        path: temp_file.path)

          upload_file(temp_file)
          factory.rotate!
        end
      end
    end
  end

  # uploads the file using the +Uploader+
  def upload_file(temp_file)
    @logger.debug('Queue for upload', path: temp_file.path)

    # if the queue is full the calling thread will be used to upload
    temp_file.close # make sure the content is on disk
    unless temp_file.empty? # rubocop:disable GuardClause
      @uploader.upload_async(temp_file,
                             on_complete: method(:clean_temporary_file))
    end
  end

  # creates an instance for the rotation strategy
  def rotation_strategy
    case @rotation_strategy_val
    when 'size'
      SizeRotationPolicy.new(size_file)
    when 'time'
      TimeRotationPolicy.new(time_file)
    when 'size_and_time'
      SizeAndTimeRotationPolicy.new(size_file, time_file)
    end
  end

  # Cleans the temporary files after it is uploaded to azure blob
  def clean_temporary_file(file)
    @logger.debug('Removing temporary file', file: file.path)
    file.delete!
  end

  # uploads files if there was a crash before
  def restore_from_crash
    @crash_uploader = Uploader.new(blob_container_resource, container_name, @logger, CRASH_RECOVERY_THREADPOOL)

    temp_folder_path = Pathname.new(@temporary_directory)
    Dir.glob(::File.join(@temporary_directory, '**/*'))
       .select { |file| ::File.file?(file) }
       .each do |file|
      temp_file = TemporaryFile.create_from_existing_file(file, temp_folder_path)
      @logger.debug('Recovering from crash and uploading', file: temp_file.path)
      @crash_uploader.upload_async(temp_file, on_complete: method(:clean_temporary_file))
    end
  end
end

#size_fileObject

File size to use for local tmeporary File



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
# File 'lib/logstash/outputs/azure.rb', line 56

class LogStash::Outputs::LogstashAzureBlobOutput < LogStash::Outputs::Base
  # name for the namespace under output for logstash configuration
  config_name 'azure'
  default :codec, "line"

  require 'logstash/outputs/blob/writable_directory_validator'
  require 'logstash/outputs/blob/path_validator'
  require 'logstash/outputs/blob/size_rotation_policy'
  require 'logstash/outputs/blob/time_rotation_policy'
  require 'logstash/outputs/blob/size_and_time_rotation_policy'
  require 'logstash/outputs/blob/temporary_file'
  require 'logstash/outputs/blob/temporary_file_factory'
  require 'logstash/outputs/blob/uploader'
  require 'logstash/outputs/blob/file_repository'

  PREFIX_KEY_NORMALIZE_CHARACTER = '_'.freeze
  PERIODIC_CHECK_INTERVAL_IN_SECONDS = 15
  CRASH_RECOVERY_THREADPOOL = Concurrent::ThreadPoolExecutor.new(min_threads: 1,
                                                                 max_threads: 2,
                                                                 fallback_policy: :caller_runs)

  # azure contianer
  config :storage_account_name, validate: :string, required: false

  # azure key
  config :storage_access_key, validate: :string, required: false

  # conatainer name
  config :container_name, validate: :string, required: false

  # mamadas
  config :size_file, validate: :number, default: 1024 * 1024 * 5
  config :time_file, validate: :number, default: 15
  config :restore, validate: :boolean, default: true
  config :temporary_directory, validate: :string, default: File.join(Dir.tmpdir, 'logstash')
  config :prefix, validate: :string, default: ''
  config :upload_queue_size, validate: :number, default: 2 * (Concurrent.processor_count * 0.25).ceil
  config :upload_workers_count, validate: :number, default: (Concurrent.processor_count * 0.5).ceil
  config :rotation_strategy_val, validate: %w[size_and_time size time], default: 'size_and_time'
  config :tags, validate: :array, default: []
  config :encoding, validate: %w[none gzip], default: 'none'

  attr_accessor :storage_account_name, :storage_access_key, :container_name,
                :size_file, :time_file, :restore, :temporary_directory, :prefix, :upload_queue_size,
                :upload_workers_count, :rotation_strategy_val, :tags, :encoding

  # initializes the +LogstashAzureBlobOutput+ instances
  # validates all config parameters
  # initializes the uploader
  def register
    unless @prefix.empty?
      unless PathValidator.valid?(prefix)
        raise LogStash::ConfigurationError.new("Prefix must not contains: #{PathValidator::INVALID_CHARACTERS}")
      end
    end

    unless WritableDirectoryValidator.valid?(@temporary_directory)
      raise LogStash::ConfigurationError.new("Logstash must have the permissions to write to the temporary directory: #{@temporary_directory}")
    end

    if @time_file.nil? && @size_file.nil? || @size_file.zero? && @time_file.zero?
      raise LogStash::ConfigurationError.new('at least one of time_file or size_file set to a value greater than 0')
    end

    @file_repository = FileRepository.new(@tags, @encoding, @temporary_directory)

    @rotation = rotation_strategy

    executor = Concurrent::ThreadPoolExecutor.new(min_threads: 1,
                                                  max_threads: @upload_workers_count,
                                                  max_queue: @upload_queue_size,
                                                  fallback_policy: :caller_runs)

    @uploader = Uploader.new(blob_container_resource, container_name, @logger, executor)

    restore_from_crash if @restore
    start_periodic_check if @rotation.needs_periodic?
  end

  # Receives multiple events and check if there is space in temporary directory
  # @param events_and_encoded [Object]
  def multi_receive_encoded(events_and_encoded)
    prefix_written_to = Set.new

    events_and_encoded.each do |event, encoded|
      prefix_key = normalize_key(event.sprintf(@prefix))
      prefix_written_to << prefix_key

      begin
        @file_repository.get_file(prefix_key) { |file| file.write(encoded) }
        # The output should stop accepting new events coming in, since it cannot do anything with them anymore.
        # Log the error and rethrow it.
      rescue Errno::ENOSPC => e
        @logger.error('Azure: No space left in temporary directory', temporary_directory: @temporary_directory)
        raise e
      end
    end

    # Groups IO calls to optimize fstat checks
    rotate_if_needed(prefix_written_to)
  end

  # close the temporary file and uploads the content to Azure
  def close
    stop_periodic_check if @rotation.needs_periodic?

    @logger.debug('Uploading current workspace')

    # The plugin has stopped receiving new events, but we still have
    # data on disk, lets make sure it get to Azure blob.
    # If Logstash get interrupted, the `restore_from_crash` (when set to true) method will pickup
    # the content in the temporary directly and upload it.
    # This will block the shutdown until all upload are done or the use force quit.
    @file_repository.each_files do |file|
      upload_file(file)
    end

    @file_repository.shutdown

    @uploader.stop # wait until all the current upload are complete
    @crash_uploader.stop if @restore # we might have still work to do for recovery so wait until we are done
  end

  # Validates and normalize prefix key
  # @param prefix_key [String]
  def normalize_key(prefix_key)
    prefix_key.gsub(PathValidator.matches_re, PREFIX_KEY_NORMALIZE_CHARACTER)
  end

  # checks periodically the tmeporary file if it needs to be rotated
  def start_periodic_check
    @logger.debug('Start periodic rotation check')

    @periodic_check = Concurrent::TimerTask.new(execution_interval: PERIODIC_CHECK_INTERVAL_IN_SECONDS) do
      @logger.debug('Periodic check for stale files')

      rotate_if_needed(@file_repository.keys)
    end

    @periodic_check.execute
  end

  def stop_periodic_check
    @periodic_check.shutdown
  end

  # login to azure cloud using azure gem and create the contianer if it doesn't exist
  # @return [Object] the azure_blob_service object, which is the endpoint to azure gem
  def blob_container_resource
    Azure.config. = 
    Azure.config.storage_access_key = storage_access_key
    azure_blob_service = Azure::Blob::BlobService.new
    list = azure_blob_service.list_containers
    list.each do |item|
      @container = item if item.name == container_name
    end

    azure_blob_service.create_container(container_name) unless @container
    azure_blob_service
  end

  # check if it needs to rotate according to rotation policy and rotates it if it needs
  # @param prefixes [String]
  def rotate_if_needed(prefixes)
    prefixes.each do |prefix|
      # Each file access is thread safe,
      # until the rotation is done then only
      # one thread has access to the resource.
      @file_repository.get_factory(prefix) do |factory|
        temp_file = factory.current

        if @rotation.rotate?(temp_file)
          @logger.debug('Rotate file',
                        strategy: @rotation.class.name,
                        key: temp_file.key,
                        path: temp_file.path)

          upload_file(temp_file)
          factory.rotate!
        end
      end
    end
  end

  # uploads the file using the +Uploader+
  def upload_file(temp_file)
    @logger.debug('Queue for upload', path: temp_file.path)

    # if the queue is full the calling thread will be used to upload
    temp_file.close # make sure the content is on disk
    unless temp_file.empty? # rubocop:disable GuardClause
      @uploader.upload_async(temp_file,
                             on_complete: method(:clean_temporary_file))
    end
  end

  # creates an instance for the rotation strategy
  def rotation_strategy
    case @rotation_strategy_val
    when 'size'
      SizeRotationPolicy.new(size_file)
    when 'time'
      TimeRotationPolicy.new(time_file)
    when 'size_and_time'
      SizeAndTimeRotationPolicy.new(size_file, time_file)
    end
  end

  # Cleans the temporary files after it is uploaded to azure blob
  def clean_temporary_file(file)
    @logger.debug('Removing temporary file', file: file.path)
    file.delete!
  end

  # uploads files if there was a crash before
  def restore_from_crash
    @crash_uploader = Uploader.new(blob_container_resource, container_name, @logger, CRASH_RECOVERY_THREADPOOL)

    temp_folder_path = Pathname.new(@temporary_directory)
    Dir.glob(::File.join(@temporary_directory, '**/*'))
       .select { |file| ::File.file?(file) }
       .each do |file|
      temp_file = TemporaryFile.create_from_existing_file(file, temp_folder_path)
      @logger.debug('Recovering from crash and uploading', file: temp_file.path)
      @crash_uploader.upload_async(temp_file, on_complete: method(:clean_temporary_file))
    end
  end
end

#storage_access_keyObject

Azure storage account access key (required) - found under the access keys tab



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
# File 'lib/logstash/outputs/azure.rb', line 56

class LogStash::Outputs::LogstashAzureBlobOutput < LogStash::Outputs::Base
  # name for the namespace under output for logstash configuration
  config_name 'azure'
  default :codec, "line"

  require 'logstash/outputs/blob/writable_directory_validator'
  require 'logstash/outputs/blob/path_validator'
  require 'logstash/outputs/blob/size_rotation_policy'
  require 'logstash/outputs/blob/time_rotation_policy'
  require 'logstash/outputs/blob/size_and_time_rotation_policy'
  require 'logstash/outputs/blob/temporary_file'
  require 'logstash/outputs/blob/temporary_file_factory'
  require 'logstash/outputs/blob/uploader'
  require 'logstash/outputs/blob/file_repository'

  PREFIX_KEY_NORMALIZE_CHARACTER = '_'.freeze
  PERIODIC_CHECK_INTERVAL_IN_SECONDS = 15
  CRASH_RECOVERY_THREADPOOL = Concurrent::ThreadPoolExecutor.new(min_threads: 1,
                                                                 max_threads: 2,
                                                                 fallback_policy: :caller_runs)

  # azure contianer
  config :storage_account_name, validate: :string, required: false

  # azure key
  config :storage_access_key, validate: :string, required: false

  # conatainer name
  config :container_name, validate: :string, required: false

  # mamadas
  config :size_file, validate: :number, default: 1024 * 1024 * 5
  config :time_file, validate: :number, default: 15
  config :restore, validate: :boolean, default: true
  config :temporary_directory, validate: :string, default: File.join(Dir.tmpdir, 'logstash')
  config :prefix, validate: :string, default: ''
  config :upload_queue_size, validate: :number, default: 2 * (Concurrent.processor_count * 0.25).ceil
  config :upload_workers_count, validate: :number, default: (Concurrent.processor_count * 0.5).ceil
  config :rotation_strategy_val, validate: %w[size_and_time size time], default: 'size_and_time'
  config :tags, validate: :array, default: []
  config :encoding, validate: %w[none gzip], default: 'none'

  attr_accessor :storage_account_name, :storage_access_key, :container_name,
                :size_file, :time_file, :restore, :temporary_directory, :prefix, :upload_queue_size,
                :upload_workers_count, :rotation_strategy_val, :tags, :encoding

  # initializes the +LogstashAzureBlobOutput+ instances
  # validates all config parameters
  # initializes the uploader
  def register
    unless @prefix.empty?
      unless PathValidator.valid?(prefix)
        raise LogStash::ConfigurationError.new("Prefix must not contains: #{PathValidator::INVALID_CHARACTERS}")
      end
    end

    unless WritableDirectoryValidator.valid?(@temporary_directory)
      raise LogStash::ConfigurationError.new("Logstash must have the permissions to write to the temporary directory: #{@temporary_directory}")
    end

    if @time_file.nil? && @size_file.nil? || @size_file.zero? && @time_file.zero?
      raise LogStash::ConfigurationError.new('at least one of time_file or size_file set to a value greater than 0')
    end

    @file_repository = FileRepository.new(@tags, @encoding, @temporary_directory)

    @rotation = rotation_strategy

    executor = Concurrent::ThreadPoolExecutor.new(min_threads: 1,
                                                  max_threads: @upload_workers_count,
                                                  max_queue: @upload_queue_size,
                                                  fallback_policy: :caller_runs)

    @uploader = Uploader.new(blob_container_resource, container_name, @logger, executor)

    restore_from_crash if @restore
    start_periodic_check if @rotation.needs_periodic?
  end

  # Receives multiple events and check if there is space in temporary directory
  # @param events_and_encoded [Object]
  def multi_receive_encoded(events_and_encoded)
    prefix_written_to = Set.new

    events_and_encoded.each do |event, encoded|
      prefix_key = normalize_key(event.sprintf(@prefix))
      prefix_written_to << prefix_key

      begin
        @file_repository.get_file(prefix_key) { |file| file.write(encoded) }
        # The output should stop accepting new events coming in, since it cannot do anything with them anymore.
        # Log the error and rethrow it.
      rescue Errno::ENOSPC => e
        @logger.error('Azure: No space left in temporary directory', temporary_directory: @temporary_directory)
        raise e
      end
    end

    # Groups IO calls to optimize fstat checks
    rotate_if_needed(prefix_written_to)
  end

  # close the temporary file and uploads the content to Azure
  def close
    stop_periodic_check if @rotation.needs_periodic?

    @logger.debug('Uploading current workspace')

    # The plugin has stopped receiving new events, but we still have
    # data on disk, lets make sure it get to Azure blob.
    # If Logstash get interrupted, the `restore_from_crash` (when set to true) method will pickup
    # the content in the temporary directly and upload it.
    # This will block the shutdown until all upload are done or the use force quit.
    @file_repository.each_files do |file|
      upload_file(file)
    end

    @file_repository.shutdown

    @uploader.stop # wait until all the current upload are complete
    @crash_uploader.stop if @restore # we might have still work to do for recovery so wait until we are done
  end

  # Validates and normalize prefix key
  # @param prefix_key [String]
  def normalize_key(prefix_key)
    prefix_key.gsub(PathValidator.matches_re, PREFIX_KEY_NORMALIZE_CHARACTER)
  end

  # checks periodically the tmeporary file if it needs to be rotated
  def start_periodic_check
    @logger.debug('Start periodic rotation check')

    @periodic_check = Concurrent::TimerTask.new(execution_interval: PERIODIC_CHECK_INTERVAL_IN_SECONDS) do
      @logger.debug('Periodic check for stale files')

      rotate_if_needed(@file_repository.keys)
    end

    @periodic_check.execute
  end

  def stop_periodic_check
    @periodic_check.shutdown
  end

  # login to azure cloud using azure gem and create the contianer if it doesn't exist
  # @return [Object] the azure_blob_service object, which is the endpoint to azure gem
  def blob_container_resource
    Azure.config. = 
    Azure.config.storage_access_key = storage_access_key
    azure_blob_service = Azure::Blob::BlobService.new
    list = azure_blob_service.list_containers
    list.each do |item|
      @container = item if item.name == container_name
    end

    azure_blob_service.create_container(container_name) unless @container
    azure_blob_service
  end

  # check if it needs to rotate according to rotation policy and rotates it if it needs
  # @param prefixes [String]
  def rotate_if_needed(prefixes)
    prefixes.each do |prefix|
      # Each file access is thread safe,
      # until the rotation is done then only
      # one thread has access to the resource.
      @file_repository.get_factory(prefix) do |factory|
        temp_file = factory.current

        if @rotation.rotate?(temp_file)
          @logger.debug('Rotate file',
                        strategy: @rotation.class.name,
                        key: temp_file.key,
                        path: temp_file.path)

          upload_file(temp_file)
          factory.rotate!
        end
      end
    end
  end

  # uploads the file using the +Uploader+
  def upload_file(temp_file)
    @logger.debug('Queue for upload', path: temp_file.path)

    # if the queue is full the calling thread will be used to upload
    temp_file.close # make sure the content is on disk
    unless temp_file.empty? # rubocop:disable GuardClause
      @uploader.upload_async(temp_file,
                             on_complete: method(:clean_temporary_file))
    end
  end

  # creates an instance for the rotation strategy
  def rotation_strategy
    case @rotation_strategy_val
    when 'size'
      SizeRotationPolicy.new(size_file)
    when 'time'
      TimeRotationPolicy.new(time_file)
    when 'size_and_time'
      SizeAndTimeRotationPolicy.new(size_file, time_file)
    end
  end

  # Cleans the temporary files after it is uploaded to azure blob
  def clean_temporary_file(file)
    @logger.debug('Removing temporary file', file: file.path)
    file.delete!
  end

  # uploads files if there was a crash before
  def restore_from_crash
    @crash_uploader = Uploader.new(blob_container_resource, container_name, @logger, CRASH_RECOVERY_THREADPOOL)

    temp_folder_path = Pathname.new(@temporary_directory)
    Dir.glob(::File.join(@temporary_directory, '**/*'))
       .select { |file| ::File.file?(file) }
       .each do |file|
      temp_file = TemporaryFile.create_from_existing_file(file, temp_folder_path)
      @logger.debug('Recovering from crash and uploading', file: temp_file.path)
      @crash_uploader.upload_async(temp_file, on_complete: method(:clean_temporary_file))
    end
  end
end

#storage_account_nameObject

Azure storage account name (required) - found under the access keys tab



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
# File 'lib/logstash/outputs/azure.rb', line 56

class LogStash::Outputs::LogstashAzureBlobOutput < LogStash::Outputs::Base
  # name for the namespace under output for logstash configuration
  config_name 'azure'
  default :codec, "line"

  require 'logstash/outputs/blob/writable_directory_validator'
  require 'logstash/outputs/blob/path_validator'
  require 'logstash/outputs/blob/size_rotation_policy'
  require 'logstash/outputs/blob/time_rotation_policy'
  require 'logstash/outputs/blob/size_and_time_rotation_policy'
  require 'logstash/outputs/blob/temporary_file'
  require 'logstash/outputs/blob/temporary_file_factory'
  require 'logstash/outputs/blob/uploader'
  require 'logstash/outputs/blob/file_repository'

  PREFIX_KEY_NORMALIZE_CHARACTER = '_'.freeze
  PERIODIC_CHECK_INTERVAL_IN_SECONDS = 15
  CRASH_RECOVERY_THREADPOOL = Concurrent::ThreadPoolExecutor.new(min_threads: 1,
                                                                 max_threads: 2,
                                                                 fallback_policy: :caller_runs)

  # azure contianer
  config :storage_account_name, validate: :string, required: false

  # azure key
  config :storage_access_key, validate: :string, required: false

  # conatainer name
  config :container_name, validate: :string, required: false

  # mamadas
  config :size_file, validate: :number, default: 1024 * 1024 * 5
  config :time_file, validate: :number, default: 15
  config :restore, validate: :boolean, default: true
  config :temporary_directory, validate: :string, default: File.join(Dir.tmpdir, 'logstash')
  config :prefix, validate: :string, default: ''
  config :upload_queue_size, validate: :number, default: 2 * (Concurrent.processor_count * 0.25).ceil
  config :upload_workers_count, validate: :number, default: (Concurrent.processor_count * 0.5).ceil
  config :rotation_strategy_val, validate: %w[size_and_time size time], default: 'size_and_time'
  config :tags, validate: :array, default: []
  config :encoding, validate: %w[none gzip], default: 'none'

  attr_accessor :storage_account_name, :storage_access_key, :container_name,
                :size_file, :time_file, :restore, :temporary_directory, :prefix, :upload_queue_size,
                :upload_workers_count, :rotation_strategy_val, :tags, :encoding

  # initializes the +LogstashAzureBlobOutput+ instances
  # validates all config parameters
  # initializes the uploader
  def register
    unless @prefix.empty?
      unless PathValidator.valid?(prefix)
        raise LogStash::ConfigurationError.new("Prefix must not contains: #{PathValidator::INVALID_CHARACTERS}")
      end
    end

    unless WritableDirectoryValidator.valid?(@temporary_directory)
      raise LogStash::ConfigurationError.new("Logstash must have the permissions to write to the temporary directory: #{@temporary_directory}")
    end

    if @time_file.nil? && @size_file.nil? || @size_file.zero? && @time_file.zero?
      raise LogStash::ConfigurationError.new('at least one of time_file or size_file set to a value greater than 0')
    end

    @file_repository = FileRepository.new(@tags, @encoding, @temporary_directory)

    @rotation = rotation_strategy

    executor = Concurrent::ThreadPoolExecutor.new(min_threads: 1,
                                                  max_threads: @upload_workers_count,
                                                  max_queue: @upload_queue_size,
                                                  fallback_policy: :caller_runs)

    @uploader = Uploader.new(blob_container_resource, container_name, @logger, executor)

    restore_from_crash if @restore
    start_periodic_check if @rotation.needs_periodic?
  end

  # Receives multiple events and check if there is space in temporary directory
  # @param events_and_encoded [Object]
  def multi_receive_encoded(events_and_encoded)
    prefix_written_to = Set.new

    events_and_encoded.each do |event, encoded|
      prefix_key = normalize_key(event.sprintf(@prefix))
      prefix_written_to << prefix_key

      begin
        @file_repository.get_file(prefix_key) { |file| file.write(encoded) }
        # The output should stop accepting new events coming in, since it cannot do anything with them anymore.
        # Log the error and rethrow it.
      rescue Errno::ENOSPC => e
        @logger.error('Azure: No space left in temporary directory', temporary_directory: @temporary_directory)
        raise e
      end
    end

    # Groups IO calls to optimize fstat checks
    rotate_if_needed(prefix_written_to)
  end

  # close the temporary file and uploads the content to Azure
  def close
    stop_periodic_check if @rotation.needs_periodic?

    @logger.debug('Uploading current workspace')

    # The plugin has stopped receiving new events, but we still have
    # data on disk, lets make sure it get to Azure blob.
    # If Logstash get interrupted, the `restore_from_crash` (when set to true) method will pickup
    # the content in the temporary directly and upload it.
    # This will block the shutdown until all upload are done or the use force quit.
    @file_repository.each_files do |file|
      upload_file(file)
    end

    @file_repository.shutdown

    @uploader.stop # wait until all the current upload are complete
    @crash_uploader.stop if @restore # we might have still work to do for recovery so wait until we are done
  end

  # Validates and normalize prefix key
  # @param prefix_key [String]
  def normalize_key(prefix_key)
    prefix_key.gsub(PathValidator.matches_re, PREFIX_KEY_NORMALIZE_CHARACTER)
  end

  # checks periodically the tmeporary file if it needs to be rotated
  def start_periodic_check
    @logger.debug('Start periodic rotation check')

    @periodic_check = Concurrent::TimerTask.new(execution_interval: PERIODIC_CHECK_INTERVAL_IN_SECONDS) do
      @logger.debug('Periodic check for stale files')

      rotate_if_needed(@file_repository.keys)
    end

    @periodic_check.execute
  end

  def stop_periodic_check
    @periodic_check.shutdown
  end

  # login to azure cloud using azure gem and create the contianer if it doesn't exist
  # @return [Object] the azure_blob_service object, which is the endpoint to azure gem
  def blob_container_resource
    Azure.config. = 
    Azure.config.storage_access_key = storage_access_key
    azure_blob_service = Azure::Blob::BlobService.new
    list = azure_blob_service.list_containers
    list.each do |item|
      @container = item if item.name == container_name
    end

    azure_blob_service.create_container(container_name) unless @container
    azure_blob_service
  end

  # check if it needs to rotate according to rotation policy and rotates it if it needs
  # @param prefixes [String]
  def rotate_if_needed(prefixes)
    prefixes.each do |prefix|
      # Each file access is thread safe,
      # until the rotation is done then only
      # one thread has access to the resource.
      @file_repository.get_factory(prefix) do |factory|
        temp_file = factory.current

        if @rotation.rotate?(temp_file)
          @logger.debug('Rotate file',
                        strategy: @rotation.class.name,
                        key: temp_file.key,
                        path: temp_file.path)

          upload_file(temp_file)
          factory.rotate!
        end
      end
    end
  end

  # uploads the file using the +Uploader+
  def upload_file(temp_file)
    @logger.debug('Queue for upload', path: temp_file.path)

    # if the queue is full the calling thread will be used to upload
    temp_file.close # make sure the content is on disk
    unless temp_file.empty? # rubocop:disable GuardClause
      @uploader.upload_async(temp_file,
                             on_complete: method(:clean_temporary_file))
    end
  end

  # creates an instance for the rotation strategy
  def rotation_strategy
    case @rotation_strategy_val
    when 'size'
      SizeRotationPolicy.new(size_file)
    when 'time'
      TimeRotationPolicy.new(time_file)
    when 'size_and_time'
      SizeAndTimeRotationPolicy.new(size_file, time_file)
    end
  end

  # Cleans the temporary files after it is uploaded to azure blob
  def clean_temporary_file(file)
    @logger.debug('Removing temporary file', file: file.path)
    file.delete!
  end

  # uploads files if there was a crash before
  def restore_from_crash
    @crash_uploader = Uploader.new(blob_container_resource, container_name, @logger, CRASH_RECOVERY_THREADPOOL)

    temp_folder_path = Pathname.new(@temporary_directory)
    Dir.glob(::File.join(@temporary_directory, '**/*'))
       .select { |file| ::File.file?(file) }
       .each do |file|
      temp_file = TemporaryFile.create_from_existing_file(file, temp_folder_path)
      @logger.debug('Recovering from crash and uploading', file: temp_file.path)
      @crash_uploader.upload_async(temp_file, on_complete: method(:clean_temporary_file))
    end
  end
end

#tagsObject

tags for the files



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
# File 'lib/logstash/outputs/azure.rb', line 56

class LogStash::Outputs::LogstashAzureBlobOutput < LogStash::Outputs::Base
  # name for the namespace under output for logstash configuration
  config_name 'azure'
  default :codec, "line"

  require 'logstash/outputs/blob/writable_directory_validator'
  require 'logstash/outputs/blob/path_validator'
  require 'logstash/outputs/blob/size_rotation_policy'
  require 'logstash/outputs/blob/time_rotation_policy'
  require 'logstash/outputs/blob/size_and_time_rotation_policy'
  require 'logstash/outputs/blob/temporary_file'
  require 'logstash/outputs/blob/temporary_file_factory'
  require 'logstash/outputs/blob/uploader'
  require 'logstash/outputs/blob/file_repository'

  PREFIX_KEY_NORMALIZE_CHARACTER = '_'.freeze
  PERIODIC_CHECK_INTERVAL_IN_SECONDS = 15
  CRASH_RECOVERY_THREADPOOL = Concurrent::ThreadPoolExecutor.new(min_threads: 1,
                                                                 max_threads: 2,
                                                                 fallback_policy: :caller_runs)

  # azure contianer
  config :storage_account_name, validate: :string, required: false

  # azure key
  config :storage_access_key, validate: :string, required: false

  # conatainer name
  config :container_name, validate: :string, required: false

  # mamadas
  config :size_file, validate: :number, default: 1024 * 1024 * 5
  config :time_file, validate: :number, default: 15
  config :restore, validate: :boolean, default: true
  config :temporary_directory, validate: :string, default: File.join(Dir.tmpdir, 'logstash')
  config :prefix, validate: :string, default: ''
  config :upload_queue_size, validate: :number, default: 2 * (Concurrent.processor_count * 0.25).ceil
  config :upload_workers_count, validate: :number, default: (Concurrent.processor_count * 0.5).ceil
  config :rotation_strategy_val, validate: %w[size_and_time size time], default: 'size_and_time'
  config :tags, validate: :array, default: []
  config :encoding, validate: %w[none gzip], default: 'none'

  attr_accessor :storage_account_name, :storage_access_key, :container_name,
                :size_file, :time_file, :restore, :temporary_directory, :prefix, :upload_queue_size,
                :upload_workers_count, :rotation_strategy_val, :tags, :encoding

  # initializes the +LogstashAzureBlobOutput+ instances
  # validates all config parameters
  # initializes the uploader
  def register
    unless @prefix.empty?
      unless PathValidator.valid?(prefix)
        raise LogStash::ConfigurationError.new("Prefix must not contains: #{PathValidator::INVALID_CHARACTERS}")
      end
    end

    unless WritableDirectoryValidator.valid?(@temporary_directory)
      raise LogStash::ConfigurationError.new("Logstash must have the permissions to write to the temporary directory: #{@temporary_directory}")
    end

    if @time_file.nil? && @size_file.nil? || @size_file.zero? && @time_file.zero?
      raise LogStash::ConfigurationError.new('at least one of time_file or size_file set to a value greater than 0')
    end

    @file_repository = FileRepository.new(@tags, @encoding, @temporary_directory)

    @rotation = rotation_strategy

    executor = Concurrent::ThreadPoolExecutor.new(min_threads: 1,
                                                  max_threads: @upload_workers_count,
                                                  max_queue: @upload_queue_size,
                                                  fallback_policy: :caller_runs)

    @uploader = Uploader.new(blob_container_resource, container_name, @logger, executor)

    restore_from_crash if @restore
    start_periodic_check if @rotation.needs_periodic?
  end

  # Receives multiple events and check if there is space in temporary directory
  # @param events_and_encoded [Object]
  def multi_receive_encoded(events_and_encoded)
    prefix_written_to = Set.new

    events_and_encoded.each do |event, encoded|
      prefix_key = normalize_key(event.sprintf(@prefix))
      prefix_written_to << prefix_key

      begin
        @file_repository.get_file(prefix_key) { |file| file.write(encoded) }
        # The output should stop accepting new events coming in, since it cannot do anything with them anymore.
        # Log the error and rethrow it.
      rescue Errno::ENOSPC => e
        @logger.error('Azure: No space left in temporary directory', temporary_directory: @temporary_directory)
        raise e
      end
    end

    # Groups IO calls to optimize fstat checks
    rotate_if_needed(prefix_written_to)
  end

  # close the temporary file and uploads the content to Azure
  def close
    stop_periodic_check if @rotation.needs_periodic?

    @logger.debug('Uploading current workspace')

    # The plugin has stopped receiving new events, but we still have
    # data on disk, lets make sure it get to Azure blob.
    # If Logstash get interrupted, the `restore_from_crash` (when set to true) method will pickup
    # the content in the temporary directly and upload it.
    # This will block the shutdown until all upload are done or the use force quit.
    @file_repository.each_files do |file|
      upload_file(file)
    end

    @file_repository.shutdown

    @uploader.stop # wait until all the current upload are complete
    @crash_uploader.stop if @restore # we might have still work to do for recovery so wait until we are done
  end

  # Validates and normalize prefix key
  # @param prefix_key [String]
  def normalize_key(prefix_key)
    prefix_key.gsub(PathValidator.matches_re, PREFIX_KEY_NORMALIZE_CHARACTER)
  end

  # checks periodically the tmeporary file if it needs to be rotated
  def start_periodic_check
    @logger.debug('Start periodic rotation check')

    @periodic_check = Concurrent::TimerTask.new(execution_interval: PERIODIC_CHECK_INTERVAL_IN_SECONDS) do
      @logger.debug('Periodic check for stale files')

      rotate_if_needed(@file_repository.keys)
    end

    @periodic_check.execute
  end

  def stop_periodic_check
    @periodic_check.shutdown
  end

  # login to azure cloud using azure gem and create the contianer if it doesn't exist
  # @return [Object] the azure_blob_service object, which is the endpoint to azure gem
  def blob_container_resource
    Azure.config. = 
    Azure.config.storage_access_key = storage_access_key
    azure_blob_service = Azure::Blob::BlobService.new
    list = azure_blob_service.list_containers
    list.each do |item|
      @container = item if item.name == container_name
    end

    azure_blob_service.create_container(container_name) unless @container
    azure_blob_service
  end

  # check if it needs to rotate according to rotation policy and rotates it if it needs
  # @param prefixes [String]
  def rotate_if_needed(prefixes)
    prefixes.each do |prefix|
      # Each file access is thread safe,
      # until the rotation is done then only
      # one thread has access to the resource.
      @file_repository.get_factory(prefix) do |factory|
        temp_file = factory.current

        if @rotation.rotate?(temp_file)
          @logger.debug('Rotate file',
                        strategy: @rotation.class.name,
                        key: temp_file.key,
                        path: temp_file.path)

          upload_file(temp_file)
          factory.rotate!
        end
      end
    end
  end

  # uploads the file using the +Uploader+
  def upload_file(temp_file)
    @logger.debug('Queue for upload', path: temp_file.path)

    # if the queue is full the calling thread will be used to upload
    temp_file.close # make sure the content is on disk
    unless temp_file.empty? # rubocop:disable GuardClause
      @uploader.upload_async(temp_file,
                             on_complete: method(:clean_temporary_file))
    end
  end

  # creates an instance for the rotation strategy
  def rotation_strategy
    case @rotation_strategy_val
    when 'size'
      SizeRotationPolicy.new(size_file)
    when 'time'
      TimeRotationPolicy.new(time_file)
    when 'size_and_time'
      SizeAndTimeRotationPolicy.new(size_file, time_file)
    end
  end

  # Cleans the temporary files after it is uploaded to azure blob
  def clean_temporary_file(file)
    @logger.debug('Removing temporary file', file: file.path)
    file.delete!
  end

  # uploads files if there was a crash before
  def restore_from_crash
    @crash_uploader = Uploader.new(blob_container_resource, container_name, @logger, CRASH_RECOVERY_THREADPOOL)

    temp_folder_path = Pathname.new(@temporary_directory)
    Dir.glob(::File.join(@temporary_directory, '**/*'))
       .select { |file| ::File.file?(file) }
       .each do |file|
      temp_file = TemporaryFile.create_from_existing_file(file, temp_folder_path)
      @logger.debug('Recovering from crash and uploading', file: temp_file.path)
      @crash_uploader.upload_async(temp_file, on_complete: method(:clean_temporary_file))
    end
  end
end

#temporary_directoryObject

temporary directory where the temporary files will be written



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
# File 'lib/logstash/outputs/azure.rb', line 56

class LogStash::Outputs::LogstashAzureBlobOutput < LogStash::Outputs::Base
  # name for the namespace under output for logstash configuration
  config_name 'azure'
  default :codec, "line"

  require 'logstash/outputs/blob/writable_directory_validator'
  require 'logstash/outputs/blob/path_validator'
  require 'logstash/outputs/blob/size_rotation_policy'
  require 'logstash/outputs/blob/time_rotation_policy'
  require 'logstash/outputs/blob/size_and_time_rotation_policy'
  require 'logstash/outputs/blob/temporary_file'
  require 'logstash/outputs/blob/temporary_file_factory'
  require 'logstash/outputs/blob/uploader'
  require 'logstash/outputs/blob/file_repository'

  PREFIX_KEY_NORMALIZE_CHARACTER = '_'.freeze
  PERIODIC_CHECK_INTERVAL_IN_SECONDS = 15
  CRASH_RECOVERY_THREADPOOL = Concurrent::ThreadPoolExecutor.new(min_threads: 1,
                                                                 max_threads: 2,
                                                                 fallback_policy: :caller_runs)

  # azure contianer
  config :storage_account_name, validate: :string, required: false

  # azure key
  config :storage_access_key, validate: :string, required: false

  # conatainer name
  config :container_name, validate: :string, required: false

  # mamadas
  config :size_file, validate: :number, default: 1024 * 1024 * 5
  config :time_file, validate: :number, default: 15
  config :restore, validate: :boolean, default: true
  config :temporary_directory, validate: :string, default: File.join(Dir.tmpdir, 'logstash')
  config :prefix, validate: :string, default: ''
  config :upload_queue_size, validate: :number, default: 2 * (Concurrent.processor_count * 0.25).ceil
  config :upload_workers_count, validate: :number, default: (Concurrent.processor_count * 0.5).ceil
  config :rotation_strategy_val, validate: %w[size_and_time size time], default: 'size_and_time'
  config :tags, validate: :array, default: []
  config :encoding, validate: %w[none gzip], default: 'none'

  attr_accessor :storage_account_name, :storage_access_key, :container_name,
                :size_file, :time_file, :restore, :temporary_directory, :prefix, :upload_queue_size,
                :upload_workers_count, :rotation_strategy_val, :tags, :encoding

  # initializes the +LogstashAzureBlobOutput+ instances
  # validates all config parameters
  # initializes the uploader
  def register
    unless @prefix.empty?
      unless PathValidator.valid?(prefix)
        raise LogStash::ConfigurationError.new("Prefix must not contains: #{PathValidator::INVALID_CHARACTERS}")
      end
    end

    unless WritableDirectoryValidator.valid?(@temporary_directory)
      raise LogStash::ConfigurationError.new("Logstash must have the permissions to write to the temporary directory: #{@temporary_directory}")
    end

    if @time_file.nil? && @size_file.nil? || @size_file.zero? && @time_file.zero?
      raise LogStash::ConfigurationError.new('at least one of time_file or size_file set to a value greater than 0')
    end

    @file_repository = FileRepository.new(@tags, @encoding, @temporary_directory)

    @rotation = rotation_strategy

    executor = Concurrent::ThreadPoolExecutor.new(min_threads: 1,
                                                  max_threads: @upload_workers_count,
                                                  max_queue: @upload_queue_size,
                                                  fallback_policy: :caller_runs)

    @uploader = Uploader.new(blob_container_resource, container_name, @logger, executor)

    restore_from_crash if @restore
    start_periodic_check if @rotation.needs_periodic?
  end

  # Receives multiple events and check if there is space in temporary directory
  # @param events_and_encoded [Object]
  def multi_receive_encoded(events_and_encoded)
    prefix_written_to = Set.new

    events_and_encoded.each do |event, encoded|
      prefix_key = normalize_key(event.sprintf(@prefix))
      prefix_written_to << prefix_key

      begin
        @file_repository.get_file(prefix_key) { |file| file.write(encoded) }
        # The output should stop accepting new events coming in, since it cannot do anything with them anymore.
        # Log the error and rethrow it.
      rescue Errno::ENOSPC => e
        @logger.error('Azure: No space left in temporary directory', temporary_directory: @temporary_directory)
        raise e
      end
    end

    # Groups IO calls to optimize fstat checks
    rotate_if_needed(prefix_written_to)
  end

  # close the temporary file and uploads the content to Azure
  def close
    stop_periodic_check if @rotation.needs_periodic?

    @logger.debug('Uploading current workspace')

    # The plugin has stopped receiving new events, but we still have
    # data on disk, lets make sure it get to Azure blob.
    # If Logstash get interrupted, the `restore_from_crash` (when set to true) method will pickup
    # the content in the temporary directly and upload it.
    # This will block the shutdown until all upload are done or the use force quit.
    @file_repository.each_files do |file|
      upload_file(file)
    end

    @file_repository.shutdown

    @uploader.stop # wait until all the current upload are complete
    @crash_uploader.stop if @restore # we might have still work to do for recovery so wait until we are done
  end

  # Validates and normalize prefix key
  # @param prefix_key [String]
  def normalize_key(prefix_key)
    prefix_key.gsub(PathValidator.matches_re, PREFIX_KEY_NORMALIZE_CHARACTER)
  end

  # checks periodically the tmeporary file if it needs to be rotated
  def start_periodic_check
    @logger.debug('Start periodic rotation check')

    @periodic_check = Concurrent::TimerTask.new(execution_interval: PERIODIC_CHECK_INTERVAL_IN_SECONDS) do
      @logger.debug('Periodic check for stale files')

      rotate_if_needed(@file_repository.keys)
    end

    @periodic_check.execute
  end

  def stop_periodic_check
    @periodic_check.shutdown
  end

  # login to azure cloud using azure gem and create the contianer if it doesn't exist
  # @return [Object] the azure_blob_service object, which is the endpoint to azure gem
  def blob_container_resource
    Azure.config. = 
    Azure.config.storage_access_key = storage_access_key
    azure_blob_service = Azure::Blob::BlobService.new
    list = azure_blob_service.list_containers
    list.each do |item|
      @container = item if item.name == container_name
    end

    azure_blob_service.create_container(container_name) unless @container
    azure_blob_service
  end

  # check if it needs to rotate according to rotation policy and rotates it if it needs
  # @param prefixes [String]
  def rotate_if_needed(prefixes)
    prefixes.each do |prefix|
      # Each file access is thread safe,
      # until the rotation is done then only
      # one thread has access to the resource.
      @file_repository.get_factory(prefix) do |factory|
        temp_file = factory.current

        if @rotation.rotate?(temp_file)
          @logger.debug('Rotate file',
                        strategy: @rotation.class.name,
                        key: temp_file.key,
                        path: temp_file.path)

          upload_file(temp_file)
          factory.rotate!
        end
      end
    end
  end

  # uploads the file using the +Uploader+
  def upload_file(temp_file)
    @logger.debug('Queue for upload', path: temp_file.path)

    # if the queue is full the calling thread will be used to upload
    temp_file.close # make sure the content is on disk
    unless temp_file.empty? # rubocop:disable GuardClause
      @uploader.upload_async(temp_file,
                             on_complete: method(:clean_temporary_file))
    end
  end

  # creates an instance for the rotation strategy
  def rotation_strategy
    case @rotation_strategy_val
    when 'size'
      SizeRotationPolicy.new(size_file)
    when 'time'
      TimeRotationPolicy.new(time_file)
    when 'size_and_time'
      SizeAndTimeRotationPolicy.new(size_file, time_file)
    end
  end

  # Cleans the temporary files after it is uploaded to azure blob
  def clean_temporary_file(file)
    @logger.debug('Removing temporary file', file: file.path)
    file.delete!
  end

  # uploads files if there was a crash before
  def restore_from_crash
    @crash_uploader = Uploader.new(blob_container_resource, container_name, @logger, CRASH_RECOVERY_THREADPOOL)

    temp_folder_path = Pathname.new(@temporary_directory)
    Dir.glob(::File.join(@temporary_directory, '**/*'))
       .select { |file| ::File.file?(file) }
       .each do |file|
      temp_file = TemporaryFile.create_from_existing_file(file, temp_folder_path)
      @logger.debug('Recovering from crash and uploading', file: temp_file.path)
      @crash_uploader.upload_async(temp_file, on_complete: method(:clean_temporary_file))
    end
  end
end

#time_fileObject

time to upload the local File



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
# File 'lib/logstash/outputs/azure.rb', line 56

class LogStash::Outputs::LogstashAzureBlobOutput < LogStash::Outputs::Base
  # name for the namespace under output for logstash configuration
  config_name 'azure'
  default :codec, "line"

  require 'logstash/outputs/blob/writable_directory_validator'
  require 'logstash/outputs/blob/path_validator'
  require 'logstash/outputs/blob/size_rotation_policy'
  require 'logstash/outputs/blob/time_rotation_policy'
  require 'logstash/outputs/blob/size_and_time_rotation_policy'
  require 'logstash/outputs/blob/temporary_file'
  require 'logstash/outputs/blob/temporary_file_factory'
  require 'logstash/outputs/blob/uploader'
  require 'logstash/outputs/blob/file_repository'

  PREFIX_KEY_NORMALIZE_CHARACTER = '_'.freeze
  PERIODIC_CHECK_INTERVAL_IN_SECONDS = 15
  CRASH_RECOVERY_THREADPOOL = Concurrent::ThreadPoolExecutor.new(min_threads: 1,
                                                                 max_threads: 2,
                                                                 fallback_policy: :caller_runs)

  # azure contianer
  config :storage_account_name, validate: :string, required: false

  # azure key
  config :storage_access_key, validate: :string, required: false

  # conatainer name
  config :container_name, validate: :string, required: false

  # mamadas
  config :size_file, validate: :number, default: 1024 * 1024 * 5
  config :time_file, validate: :number, default: 15
  config :restore, validate: :boolean, default: true
  config :temporary_directory, validate: :string, default: File.join(Dir.tmpdir, 'logstash')
  config :prefix, validate: :string, default: ''
  config :upload_queue_size, validate: :number, default: 2 * (Concurrent.processor_count * 0.25).ceil
  config :upload_workers_count, validate: :number, default: (Concurrent.processor_count * 0.5).ceil
  config :rotation_strategy_val, validate: %w[size_and_time size time], default: 'size_and_time'
  config :tags, validate: :array, default: []
  config :encoding, validate: %w[none gzip], default: 'none'

  attr_accessor :storage_account_name, :storage_access_key, :container_name,
                :size_file, :time_file, :restore, :temporary_directory, :prefix, :upload_queue_size,
                :upload_workers_count, :rotation_strategy_val, :tags, :encoding

  # initializes the +LogstashAzureBlobOutput+ instances
  # validates all config parameters
  # initializes the uploader
  def register
    unless @prefix.empty?
      unless PathValidator.valid?(prefix)
        raise LogStash::ConfigurationError.new("Prefix must not contains: #{PathValidator::INVALID_CHARACTERS}")
      end
    end

    unless WritableDirectoryValidator.valid?(@temporary_directory)
      raise LogStash::ConfigurationError.new("Logstash must have the permissions to write to the temporary directory: #{@temporary_directory}")
    end

    if @time_file.nil? && @size_file.nil? || @size_file.zero? && @time_file.zero?
      raise LogStash::ConfigurationError.new('at least one of time_file or size_file set to a value greater than 0')
    end

    @file_repository = FileRepository.new(@tags, @encoding, @temporary_directory)

    @rotation = rotation_strategy

    executor = Concurrent::ThreadPoolExecutor.new(min_threads: 1,
                                                  max_threads: @upload_workers_count,
                                                  max_queue: @upload_queue_size,
                                                  fallback_policy: :caller_runs)

    @uploader = Uploader.new(blob_container_resource, container_name, @logger, executor)

    restore_from_crash if @restore
    start_periodic_check if @rotation.needs_periodic?
  end

  # Receives multiple events and check if there is space in temporary directory
  # @param events_and_encoded [Object]
  def multi_receive_encoded(events_and_encoded)
    prefix_written_to = Set.new

    events_and_encoded.each do |event, encoded|
      prefix_key = normalize_key(event.sprintf(@prefix))
      prefix_written_to << prefix_key

      begin
        @file_repository.get_file(prefix_key) { |file| file.write(encoded) }
        # The output should stop accepting new events coming in, since it cannot do anything with them anymore.
        # Log the error and rethrow it.
      rescue Errno::ENOSPC => e
        @logger.error('Azure: No space left in temporary directory', temporary_directory: @temporary_directory)
        raise e
      end
    end

    # Groups IO calls to optimize fstat checks
    rotate_if_needed(prefix_written_to)
  end

  # close the temporary file and uploads the content to Azure
  def close
    stop_periodic_check if @rotation.needs_periodic?

    @logger.debug('Uploading current workspace')

    # The plugin has stopped receiving new events, but we still have
    # data on disk, lets make sure it get to Azure blob.
    # If Logstash get interrupted, the `restore_from_crash` (when set to true) method will pickup
    # the content in the temporary directly and upload it.
    # This will block the shutdown until all upload are done or the use force quit.
    @file_repository.each_files do |file|
      upload_file(file)
    end

    @file_repository.shutdown

    @uploader.stop # wait until all the current upload are complete
    @crash_uploader.stop if @restore # we might have still work to do for recovery so wait until we are done
  end

  # Validates and normalize prefix key
  # @param prefix_key [String]
  def normalize_key(prefix_key)
    prefix_key.gsub(PathValidator.matches_re, PREFIX_KEY_NORMALIZE_CHARACTER)
  end

  # checks periodically the tmeporary file if it needs to be rotated
  def start_periodic_check
    @logger.debug('Start periodic rotation check')

    @periodic_check = Concurrent::TimerTask.new(execution_interval: PERIODIC_CHECK_INTERVAL_IN_SECONDS) do
      @logger.debug('Periodic check for stale files')

      rotate_if_needed(@file_repository.keys)
    end

    @periodic_check.execute
  end

  def stop_periodic_check
    @periodic_check.shutdown
  end

  # login to azure cloud using azure gem and create the contianer if it doesn't exist
  # @return [Object] the azure_blob_service object, which is the endpoint to azure gem
  def blob_container_resource
    Azure.config. = 
    Azure.config.storage_access_key = storage_access_key
    azure_blob_service = Azure::Blob::BlobService.new
    list = azure_blob_service.list_containers
    list.each do |item|
      @container = item if item.name == container_name
    end

    azure_blob_service.create_container(container_name) unless @container
    azure_blob_service
  end

  # check if it needs to rotate according to rotation policy and rotates it if it needs
  # @param prefixes [String]
  def rotate_if_needed(prefixes)
    prefixes.each do |prefix|
      # Each file access is thread safe,
      # until the rotation is done then only
      # one thread has access to the resource.
      @file_repository.get_factory(prefix) do |factory|
        temp_file = factory.current

        if @rotation.rotate?(temp_file)
          @logger.debug('Rotate file',
                        strategy: @rotation.class.name,
                        key: temp_file.key,
                        path: temp_file.path)

          upload_file(temp_file)
          factory.rotate!
        end
      end
    end
  end

  # uploads the file using the +Uploader+
  def upload_file(temp_file)
    @logger.debug('Queue for upload', path: temp_file.path)

    # if the queue is full the calling thread will be used to upload
    temp_file.close # make sure the content is on disk
    unless temp_file.empty? # rubocop:disable GuardClause
      @uploader.upload_async(temp_file,
                             on_complete: method(:clean_temporary_file))
    end
  end

  # creates an instance for the rotation strategy
  def rotation_strategy
    case @rotation_strategy_val
    when 'size'
      SizeRotationPolicy.new(size_file)
    when 'time'
      TimeRotationPolicy.new(time_file)
    when 'size_and_time'
      SizeAndTimeRotationPolicy.new(size_file, time_file)
    end
  end

  # Cleans the temporary files after it is uploaded to azure blob
  def clean_temporary_file(file)
    @logger.debug('Removing temporary file', file: file.path)
    file.delete!
  end

  # uploads files if there was a crash before
  def restore_from_crash
    @crash_uploader = Uploader.new(blob_container_resource, container_name, @logger, CRASH_RECOVERY_THREADPOOL)

    temp_folder_path = Pathname.new(@temporary_directory)
    Dir.glob(::File.join(@temporary_directory, '**/*'))
       .select { |file| ::File.file?(file) }
       .each do |file|
      temp_file = TemporaryFile.create_from_existing_file(file, temp_folder_path)
      @logger.debug('Recovering from crash and uploading', file: temp_file.path)
      @crash_uploader.upload_async(temp_file, on_complete: method(:clean_temporary_file))
    end
  end
end

#upload queue size(queuesize) ⇒ Object

upload que size



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
# File 'lib/logstash/outputs/azure.rb', line 56

class LogStash::Outputs::LogstashAzureBlobOutput < LogStash::Outputs::Base
  # name for the namespace under output for logstash configuration
  config_name 'azure'
  default :codec, "line"

  require 'logstash/outputs/blob/writable_directory_validator'
  require 'logstash/outputs/blob/path_validator'
  require 'logstash/outputs/blob/size_rotation_policy'
  require 'logstash/outputs/blob/time_rotation_policy'
  require 'logstash/outputs/blob/size_and_time_rotation_policy'
  require 'logstash/outputs/blob/temporary_file'
  require 'logstash/outputs/blob/temporary_file_factory'
  require 'logstash/outputs/blob/uploader'
  require 'logstash/outputs/blob/file_repository'

  PREFIX_KEY_NORMALIZE_CHARACTER = '_'.freeze
  PERIODIC_CHECK_INTERVAL_IN_SECONDS = 15
  CRASH_RECOVERY_THREADPOOL = Concurrent::ThreadPoolExecutor.new(min_threads: 1,
                                                                 max_threads: 2,
                                                                 fallback_policy: :caller_runs)

  # azure contianer
  config :storage_account_name, validate: :string, required: false

  # azure key
  config :storage_access_key, validate: :string, required: false

  # conatainer name
  config :container_name, validate: :string, required: false

  # mamadas
  config :size_file, validate: :number, default: 1024 * 1024 * 5
  config :time_file, validate: :number, default: 15
  config :restore, validate: :boolean, default: true
  config :temporary_directory, validate: :string, default: File.join(Dir.tmpdir, 'logstash')
  config :prefix, validate: :string, default: ''
  config :upload_queue_size, validate: :number, default: 2 * (Concurrent.processor_count * 0.25).ceil
  config :upload_workers_count, validate: :number, default: (Concurrent.processor_count * 0.5).ceil
  config :rotation_strategy_val, validate: %w[size_and_time size time], default: 'size_and_time'
  config :tags, validate: :array, default: []
  config :encoding, validate: %w[none gzip], default: 'none'

  attr_accessor :storage_account_name, :storage_access_key, :container_name,
                :size_file, :time_file, :restore, :temporary_directory, :prefix, :upload_queue_size,
                :upload_workers_count, :rotation_strategy_val, :tags, :encoding

  # initializes the +LogstashAzureBlobOutput+ instances
  # validates all config parameters
  # initializes the uploader
  def register
    unless @prefix.empty?
      unless PathValidator.valid?(prefix)
        raise LogStash::ConfigurationError.new("Prefix must not contains: #{PathValidator::INVALID_CHARACTERS}")
      end
    end

    unless WritableDirectoryValidator.valid?(@temporary_directory)
      raise LogStash::ConfigurationError.new("Logstash must have the permissions to write to the temporary directory: #{@temporary_directory}")
    end

    if @time_file.nil? && @size_file.nil? || @size_file.zero? && @time_file.zero?
      raise LogStash::ConfigurationError.new('at least one of time_file or size_file set to a value greater than 0')
    end

    @file_repository = FileRepository.new(@tags, @encoding, @temporary_directory)

    @rotation = rotation_strategy

    executor = Concurrent::ThreadPoolExecutor.new(min_threads: 1,
                                                  max_threads: @upload_workers_count,
                                                  max_queue: @upload_queue_size,
                                                  fallback_policy: :caller_runs)

    @uploader = Uploader.new(blob_container_resource, container_name, @logger, executor)

    restore_from_crash if @restore
    start_periodic_check if @rotation.needs_periodic?
  end

  # Receives multiple events and check if there is space in temporary directory
  # @param events_and_encoded [Object]
  def multi_receive_encoded(events_and_encoded)
    prefix_written_to = Set.new

    events_and_encoded.each do |event, encoded|
      prefix_key = normalize_key(event.sprintf(@prefix))
      prefix_written_to << prefix_key

      begin
        @file_repository.get_file(prefix_key) { |file| file.write(encoded) }
        # The output should stop accepting new events coming in, since it cannot do anything with them anymore.
        # Log the error and rethrow it.
      rescue Errno::ENOSPC => e
        @logger.error('Azure: No space left in temporary directory', temporary_directory: @temporary_directory)
        raise e
      end
    end

    # Groups IO calls to optimize fstat checks
    rotate_if_needed(prefix_written_to)
  end

  # close the temporary file and uploads the content to Azure
  def close
    stop_periodic_check if @rotation.needs_periodic?

    @logger.debug('Uploading current workspace')

    # The plugin has stopped receiving new events, but we still have
    # data on disk, lets make sure it get to Azure blob.
    # If Logstash get interrupted, the `restore_from_crash` (when set to true) method will pickup
    # the content in the temporary directly and upload it.
    # This will block the shutdown until all upload are done or the use force quit.
    @file_repository.each_files do |file|
      upload_file(file)
    end

    @file_repository.shutdown

    @uploader.stop # wait until all the current upload are complete
    @crash_uploader.stop if @restore # we might have still work to do for recovery so wait until we are done
  end

  # Validates and normalize prefix key
  # @param prefix_key [String]
  def normalize_key(prefix_key)
    prefix_key.gsub(PathValidator.matches_re, PREFIX_KEY_NORMALIZE_CHARACTER)
  end

  # checks periodically the tmeporary file if it needs to be rotated
  def start_periodic_check
    @logger.debug('Start periodic rotation check')

    @periodic_check = Concurrent::TimerTask.new(execution_interval: PERIODIC_CHECK_INTERVAL_IN_SECONDS) do
      @logger.debug('Periodic check for stale files')

      rotate_if_needed(@file_repository.keys)
    end

    @periodic_check.execute
  end

  def stop_periodic_check
    @periodic_check.shutdown
  end

  # login to azure cloud using azure gem and create the contianer if it doesn't exist
  # @return [Object] the azure_blob_service object, which is the endpoint to azure gem
  def blob_container_resource
    Azure.config. = 
    Azure.config.storage_access_key = storage_access_key
    azure_blob_service = Azure::Blob::BlobService.new
    list = azure_blob_service.list_containers
    list.each do |item|
      @container = item if item.name == container_name
    end

    azure_blob_service.create_container(container_name) unless @container
    azure_blob_service
  end

  # check if it needs to rotate according to rotation policy and rotates it if it needs
  # @param prefixes [String]
  def rotate_if_needed(prefixes)
    prefixes.each do |prefix|
      # Each file access is thread safe,
      # until the rotation is done then only
      # one thread has access to the resource.
      @file_repository.get_factory(prefix) do |factory|
        temp_file = factory.current

        if @rotation.rotate?(temp_file)
          @logger.debug('Rotate file',
                        strategy: @rotation.class.name,
                        key: temp_file.key,
                        path: temp_file.path)

          upload_file(temp_file)
          factory.rotate!
        end
      end
    end
  end

  # uploads the file using the +Uploader+
  def upload_file(temp_file)
    @logger.debug('Queue for upload', path: temp_file.path)

    # if the queue is full the calling thread will be used to upload
    temp_file.close # make sure the content is on disk
    unless temp_file.empty? # rubocop:disable GuardClause
      @uploader.upload_async(temp_file,
                             on_complete: method(:clean_temporary_file))
    end
  end

  # creates an instance for the rotation strategy
  def rotation_strategy
    case @rotation_strategy_val
    when 'size'
      SizeRotationPolicy.new(size_file)
    when 'time'
      TimeRotationPolicy.new(time_file)
    when 'size_and_time'
      SizeAndTimeRotationPolicy.new(size_file, time_file)
    end
  end

  # Cleans the temporary files after it is uploaded to azure blob
  def clean_temporary_file(file)
    @logger.debug('Removing temporary file', file: file.path)
    file.delete!
  end

  # uploads files if there was a crash before
  def restore_from_crash
    @crash_uploader = Uploader.new(blob_container_resource, container_name, @logger, CRASH_RECOVERY_THREADPOOL)

    temp_folder_path = Pathname.new(@temporary_directory)
    Dir.glob(::File.join(@temporary_directory, '**/*'))
       .select { |file| ::File.file?(file) }
       .each do |file|
      temp_file = TemporaryFile.create_from_existing_file(file, temp_folder_path)
      @logger.debug('Recovering from crash and uploading', file: temp_file.path)
      @crash_uploader.upload_async(temp_file, on_complete: method(:clean_temporary_file))
    end
  end
end

#upload workers count(workerscount) ⇒ Object

how much workers for uplaod



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
# File 'lib/logstash/outputs/azure.rb', line 56

class LogStash::Outputs::LogstashAzureBlobOutput < LogStash::Outputs::Base
  # name for the namespace under output for logstash configuration
  config_name 'azure'
  default :codec, "line"

  require 'logstash/outputs/blob/writable_directory_validator'
  require 'logstash/outputs/blob/path_validator'
  require 'logstash/outputs/blob/size_rotation_policy'
  require 'logstash/outputs/blob/time_rotation_policy'
  require 'logstash/outputs/blob/size_and_time_rotation_policy'
  require 'logstash/outputs/blob/temporary_file'
  require 'logstash/outputs/blob/temporary_file_factory'
  require 'logstash/outputs/blob/uploader'
  require 'logstash/outputs/blob/file_repository'

  PREFIX_KEY_NORMALIZE_CHARACTER = '_'.freeze
  PERIODIC_CHECK_INTERVAL_IN_SECONDS = 15
  CRASH_RECOVERY_THREADPOOL = Concurrent::ThreadPoolExecutor.new(min_threads: 1,
                                                                 max_threads: 2,
                                                                 fallback_policy: :caller_runs)

  # azure contianer
  config :storage_account_name, validate: :string, required: false

  # azure key
  config :storage_access_key, validate: :string, required: false

  # conatainer name
  config :container_name, validate: :string, required: false

  # mamadas
  config :size_file, validate: :number, default: 1024 * 1024 * 5
  config :time_file, validate: :number, default: 15
  config :restore, validate: :boolean, default: true
  config :temporary_directory, validate: :string, default: File.join(Dir.tmpdir, 'logstash')
  config :prefix, validate: :string, default: ''
  config :upload_queue_size, validate: :number, default: 2 * (Concurrent.processor_count * 0.25).ceil
  config :upload_workers_count, validate: :number, default: (Concurrent.processor_count * 0.5).ceil
  config :rotation_strategy_val, validate: %w[size_and_time size time], default: 'size_and_time'
  config :tags, validate: :array, default: []
  config :encoding, validate: %w[none gzip], default: 'none'

  attr_accessor :storage_account_name, :storage_access_key, :container_name,
                :size_file, :time_file, :restore, :temporary_directory, :prefix, :upload_queue_size,
                :upload_workers_count, :rotation_strategy_val, :tags, :encoding

  # initializes the +LogstashAzureBlobOutput+ instances
  # validates all config parameters
  # initializes the uploader
  def register
    unless @prefix.empty?
      unless PathValidator.valid?(prefix)
        raise LogStash::ConfigurationError.new("Prefix must not contains: #{PathValidator::INVALID_CHARACTERS}")
      end
    end

    unless WritableDirectoryValidator.valid?(@temporary_directory)
      raise LogStash::ConfigurationError.new("Logstash must have the permissions to write to the temporary directory: #{@temporary_directory}")
    end

    if @time_file.nil? && @size_file.nil? || @size_file.zero? && @time_file.zero?
      raise LogStash::ConfigurationError.new('at least one of time_file or size_file set to a value greater than 0')
    end

    @file_repository = FileRepository.new(@tags, @encoding, @temporary_directory)

    @rotation = rotation_strategy

    executor = Concurrent::ThreadPoolExecutor.new(min_threads: 1,
                                                  max_threads: @upload_workers_count,
                                                  max_queue: @upload_queue_size,
                                                  fallback_policy: :caller_runs)

    @uploader = Uploader.new(blob_container_resource, container_name, @logger, executor)

    restore_from_crash if @restore
    start_periodic_check if @rotation.needs_periodic?
  end

  # Receives multiple events and check if there is space in temporary directory
  # @param events_and_encoded [Object]
  def multi_receive_encoded(events_and_encoded)
    prefix_written_to = Set.new

    events_and_encoded.each do |event, encoded|
      prefix_key = normalize_key(event.sprintf(@prefix))
      prefix_written_to << prefix_key

      begin
        @file_repository.get_file(prefix_key) { |file| file.write(encoded) }
        # The output should stop accepting new events coming in, since it cannot do anything with them anymore.
        # Log the error and rethrow it.
      rescue Errno::ENOSPC => e
        @logger.error('Azure: No space left in temporary directory', temporary_directory: @temporary_directory)
        raise e
      end
    end

    # Groups IO calls to optimize fstat checks
    rotate_if_needed(prefix_written_to)
  end

  # close the temporary file and uploads the content to Azure
  def close
    stop_periodic_check if @rotation.needs_periodic?

    @logger.debug('Uploading current workspace')

    # The plugin has stopped receiving new events, but we still have
    # data on disk, lets make sure it get to Azure blob.
    # If Logstash get interrupted, the `restore_from_crash` (when set to true) method will pickup
    # the content in the temporary directly and upload it.
    # This will block the shutdown until all upload are done or the use force quit.
    @file_repository.each_files do |file|
      upload_file(file)
    end

    @file_repository.shutdown

    @uploader.stop # wait until all the current upload are complete
    @crash_uploader.stop if @restore # we might have still work to do for recovery so wait until we are done
  end

  # Validates and normalize prefix key
  # @param prefix_key [String]
  def normalize_key(prefix_key)
    prefix_key.gsub(PathValidator.matches_re, PREFIX_KEY_NORMALIZE_CHARACTER)
  end

  # checks periodically the tmeporary file if it needs to be rotated
  def start_periodic_check
    @logger.debug('Start periodic rotation check')

    @periodic_check = Concurrent::TimerTask.new(execution_interval: PERIODIC_CHECK_INTERVAL_IN_SECONDS) do
      @logger.debug('Periodic check for stale files')

      rotate_if_needed(@file_repository.keys)
    end

    @periodic_check.execute
  end

  def stop_periodic_check
    @periodic_check.shutdown
  end

  # login to azure cloud using azure gem and create the contianer if it doesn't exist
  # @return [Object] the azure_blob_service object, which is the endpoint to azure gem
  def blob_container_resource
    Azure.config. = 
    Azure.config.storage_access_key = storage_access_key
    azure_blob_service = Azure::Blob::BlobService.new
    list = azure_blob_service.list_containers
    list.each do |item|
      @container = item if item.name == container_name
    end

    azure_blob_service.create_container(container_name) unless @container
    azure_blob_service
  end

  # check if it needs to rotate according to rotation policy and rotates it if it needs
  # @param prefixes [String]
  def rotate_if_needed(prefixes)
    prefixes.each do |prefix|
      # Each file access is thread safe,
      # until the rotation is done then only
      # one thread has access to the resource.
      @file_repository.get_factory(prefix) do |factory|
        temp_file = factory.current

        if @rotation.rotate?(temp_file)
          @logger.debug('Rotate file',
                        strategy: @rotation.class.name,
                        key: temp_file.key,
                        path: temp_file.path)

          upload_file(temp_file)
          factory.rotate!
        end
      end
    end
  end

  # uploads the file using the +Uploader+
  def upload_file(temp_file)
    @logger.debug('Queue for upload', path: temp_file.path)

    # if the queue is full the calling thread will be used to upload
    temp_file.close # make sure the content is on disk
    unless temp_file.empty? # rubocop:disable GuardClause
      @uploader.upload_async(temp_file,
                             on_complete: method(:clean_temporary_file))
    end
  end

  # creates an instance for the rotation strategy
  def rotation_strategy
    case @rotation_strategy_val
    when 'size'
      SizeRotationPolicy.new(size_file)
    when 'time'
      TimeRotationPolicy.new(time_file)
    when 'size_and_time'
      SizeAndTimeRotationPolicy.new(size_file, time_file)
    end
  end

  # Cleans the temporary files after it is uploaded to azure blob
  def clean_temporary_file(file)
    @logger.debug('Removing temporary file', file: file.path)
    file.delete!
  end

  # uploads files if there was a crash before
  def restore_from_crash
    @crash_uploader = Uploader.new(blob_container_resource, container_name, @logger, CRASH_RECOVERY_THREADPOOL)

    temp_folder_path = Pathname.new(@temporary_directory)
    Dir.glob(::File.join(@temporary_directory, '**/*'))
       .select { |file| ::File.file?(file) }
       .each do |file|
      temp_file = TemporaryFile.create_from_existing_file(file, temp_folder_path)
      @logger.debug('Recovering from crash and uploading', file: temp_file.path)
      @crash_uploader.upload_async(temp_file, on_complete: method(:clean_temporary_file))
    end
  end
end

#upload_queue_sizeObject

Returns the value of attribute upload_queue_size.



98
99
100
# File 'lib/logstash/outputs/azure.rb', line 98

def upload_queue_size
  @upload_queue_size
end

#upload_workers_countObject

Returns the value of attribute upload_workers_count.



98
99
100
# File 'lib/logstash/outputs/azure.rb', line 98

def upload_workers_count
  @upload_workers_count
end

Instance Method Details

#blob_container_resourceObject

login to azure cloud using azure gem and create the contianer if it doesn’t exist

Returns:

  • (Object)

    the azure_blob_service object, which is the endpoint to azure gem



204
205
206
207
208
209
210
211
212
213
214
215
# File 'lib/logstash/outputs/azure.rb', line 204

def blob_container_resource
  Azure.config. = 
  Azure.config.storage_access_key = storage_access_key
  azure_blob_service = Azure::Blob::BlobService.new
  list = azure_blob_service.list_containers
  list.each do |item|
    @container = item if item.name == container_name
  end

  azure_blob_service.create_container(container_name) unless @container
  azure_blob_service
end

#clean_temporary_file(file) ⇒ Object

Cleans the temporary files after it is uploaded to azure blob



265
266
267
268
# File 'lib/logstash/outputs/azure.rb', line 265

def clean_temporary_file(file)
  @logger.debug('Removing temporary file', file: file.path)
  file.delete!
end

#closeObject

close the temporary file and uploads the content to Azure



159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
# File 'lib/logstash/outputs/azure.rb', line 159

def close
  stop_periodic_check if @rotation.needs_periodic?

  @logger.debug('Uploading current workspace')

  # The plugin has stopped receiving new events, but we still have
  # data on disk, lets make sure it get to Azure blob.
  # If Logstash get interrupted, the `restore_from_crash` (when set to true) method will pickup
  # the content in the temporary directly and upload it.
  # This will block the shutdown until all upload are done or the use force quit.
  @file_repository.each_files do |file|
    upload_file(file)
  end

  @file_repository.shutdown

  @uploader.stop # wait until all the current upload are complete
  @crash_uploader.stop if @restore # we might have still work to do for recovery so wait until we are done
end

#multi_receive_encoded(events_and_encoded) ⇒ Object

Receives multiple events and check if there is space in temporary directory

Parameters:

  • events_and_encoded (Object)


137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
# File 'lib/logstash/outputs/azure.rb', line 137

def multi_receive_encoded(events_and_encoded)
  prefix_written_to = Set.new

  events_and_encoded.each do |event, encoded|
    prefix_key = normalize_key(event.sprintf(@prefix))
    prefix_written_to << prefix_key

    begin
      @file_repository.get_file(prefix_key) { |file| file.write(encoded) }
      # The output should stop accepting new events coming in, since it cannot do anything with them anymore.
      # Log the error and rethrow it.
    rescue Errno::ENOSPC => e
      @logger.error('Azure: No space left in temporary directory', temporary_directory: @temporary_directory)
      raise e
    end
  end

  # Groups IO calls to optimize fstat checks
  rotate_if_needed(prefix_written_to)
end

#normalize_key(prefix_key) ⇒ Object

Validates and normalize prefix key

Parameters:

  • prefix_key (String)


181
182
183
# File 'lib/logstash/outputs/azure.rb', line 181

def normalize_key(prefix_key)
  prefix_key.gsub(PathValidator.matches_re, PREFIX_KEY_NORMALIZE_CHARACTER)
end

#registerObject

initializes the LogstashAzureBlobOutput instances validates all config parameters initializes the uploader



105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/logstash/outputs/azure.rb', line 105

def register
  unless @prefix.empty?
    unless PathValidator.valid?(prefix)
      raise LogStash::ConfigurationError.new("Prefix must not contains: #{PathValidator::INVALID_CHARACTERS}")
    end
  end

  unless WritableDirectoryValidator.valid?(@temporary_directory)
    raise LogStash::ConfigurationError.new("Logstash must have the permissions to write to the temporary directory: #{@temporary_directory}")
  end

  if @time_file.nil? && @size_file.nil? || @size_file.zero? && @time_file.zero?
    raise LogStash::ConfigurationError.new('at least one of time_file or size_file set to a value greater than 0')
  end

  @file_repository = FileRepository.new(@tags, @encoding, @temporary_directory)

  @rotation = rotation_strategy

  executor = Concurrent::ThreadPoolExecutor.new(min_threads: 1,
                                                max_threads: @upload_workers_count,
                                                max_queue: @upload_queue_size,
                                                fallback_policy: :caller_runs)

  @uploader = Uploader.new(blob_container_resource, container_name, @logger, executor)

  restore_from_crash if @restore
  start_periodic_check if @rotation.needs_periodic?
end

#restore_from_crashObject

uploads files if there was a crash before



271
272
273
274
275
276
277
278
279
280
281
282
# File 'lib/logstash/outputs/azure.rb', line 271

def restore_from_crash
  @crash_uploader = Uploader.new(blob_container_resource, container_name, @logger, CRASH_RECOVERY_THREADPOOL)

  temp_folder_path = Pathname.new(@temporary_directory)
  Dir.glob(::File.join(@temporary_directory, '**/*'))
     .select { |file| ::File.file?(file) }
     .each do |file|
    temp_file = TemporaryFile.create_from_existing_file(file, temp_folder_path)
    @logger.debug('Recovering from crash and uploading', file: temp_file.path)
    @crash_uploader.upload_async(temp_file, on_complete: method(:clean_temporary_file))
  end
end

#rotate_if_needed(prefixes) ⇒ Object

check if it needs to rotate according to rotation policy and rotates it if it needs

Parameters:

  • prefixes (String)


219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
# File 'lib/logstash/outputs/azure.rb', line 219

def rotate_if_needed(prefixes)
  prefixes.each do |prefix|
    # Each file access is thread safe,
    # until the rotation is done then only
    # one thread has access to the resource.
    @file_repository.get_factory(prefix) do |factory|
      temp_file = factory.current

      if @rotation.rotate?(temp_file)
        @logger.debug('Rotate file',
                      strategy: @rotation.class.name,
                      key: temp_file.key,
                      path: temp_file.path)

        upload_file(temp_file)
        factory.rotate!
      end
    end
  end
end

#rotation_strategyObject

creates an instance for the rotation strategy



253
254
255
256
257
258
259
260
261
262
# File 'lib/logstash/outputs/azure.rb', line 253

def rotation_strategy
  case @rotation_strategy_val
  when 'size'
    SizeRotationPolicy.new(size_file)
  when 'time'
    TimeRotationPolicy.new(time_file)
  when 'size_and_time'
    SizeAndTimeRotationPolicy.new(size_file, time_file)
  end
end

#start_periodic_checkObject

checks periodically the tmeporary file if it needs to be rotated



186
187
188
189
190
191
192
193
194
195
196
# File 'lib/logstash/outputs/azure.rb', line 186

def start_periodic_check
  @logger.debug('Start periodic rotation check')

  @periodic_check = Concurrent::TimerTask.new(execution_interval: PERIODIC_CHECK_INTERVAL_IN_SECONDS) do
    @logger.debug('Periodic check for stale files')

    rotate_if_needed(@file_repository.keys)
  end

  @periodic_check.execute
end

#stop_periodic_checkObject



198
199
200
# File 'lib/logstash/outputs/azure.rb', line 198

def stop_periodic_check
  @periodic_check.shutdown
end

#upload_file(temp_file) ⇒ Object

uploads the file using the Uploader



241
242
243
244
245
246
247
248
249
250
# File 'lib/logstash/outputs/azure.rb', line 241

def upload_file(temp_file)
  @logger.debug('Queue for upload', path: temp_file.path)

  # if the queue is full the calling thread will be used to upload
  temp_file.close # make sure the content is on disk
  unless temp_file.empty? # rubocop:disable GuardClause
    @uploader.upload_async(temp_file,
                           on_complete: method(:clean_temporary_file))
  end
end