Class: LogStash::Outputs::LogstashAzureBlobOutput::Uploader

Inherits:
Object
  • Object
show all
Defined in:
lib/logstash/outputs/blob/uploader.rb

Overview

a sub class of LogstashAzureBlobOutput this class uploads the files to Azure cloud

Constant Summary collapse

TIME_BEFORE_RETRYING_SECONDS =
1
DEFAULT_THREADPOOL =
Concurrent::ThreadPoolExecutor.new(min_threads: 1,
max_threads: 8,
max_queue: 1,
fallback_policy: :caller_runs)

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(blob_account, container_name, logger, threadpool = DEFAULT_THREADPOOL) ⇒ Uploader

Initializes the class

Parameters:

  • blob_account (Object)

    endpoint to azure gem

  • container_name (String)

    name of the container in azure blob, at this point, if it doesn’t exist, it was already created



21
22
23
24
25
26
# File 'lib/logstash/outputs/blob/uploader.rb', line 21

def initialize(, container_name, logger, threadpool = DEFAULT_THREADPOOL)
  @blob_account = 
  @workers_pool = threadpool
  @logger = logger
  @container_name = container_name
end

Instance Attribute Details

#blob_accountObject

Returns the value of attribute blob_account.



16
17
18
# File 'lib/logstash/outputs/blob/uploader.rb', line 16

def 
  @blob_account
end

#container_nameObject

Returns the value of attribute container_name.



16
17
18
# File 'lib/logstash/outputs/blob/uploader.rb', line 16

def container_name
  @container_name
end

#loggerObject

Returns the value of attribute logger.



16
17
18
# File 'lib/logstash/outputs/blob/uploader.rb', line 16

def logger
  @logger
end

Instance Method Details

#stopObject

stop threads



67
68
69
70
# File 'lib/logstash/outputs/blob/uploader.rb', line 67

def stop
  @workers_pool.shutdown
  @workers_pool.wait_for_termination(nil) # block until its done
end

#upload(file, options = {}) ⇒ Object

Uploads the file to the container



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/logstash/outputs/blob/uploader.rb', line 37

def upload(file, options = {})

  begin
    content = Object::File.open(file.path, 'rb').read
    filename = Object::File.basename file.path
    puts filename
    blob = .create_block_blob(container_name, filename, content)
    puts blob.name
  rescue => e
    # When we get here it usually mean that LogstashAzureBlobOutput tried to do some retry by himself (default is 3)
    # When the retry limit is reached or another error happen we will wait and retry.
    #
    # Thread might be stuck here, but I think its better than losing anything
    # its either a transient errors or something bad really happened.
    logger.error('Uploading failed, retrying', exception: e.class, message: e.message, path: file.path, backtrace: e.backtrace)
    retry
  end

  options[:on_complete].call(file) unless options[:on_complete].nil?
  blob
rescue => e
  logger.error('An error occured in the `on_complete` uploader',
               exception: e.class,
               message: e.message,
               path: file.path,
               backtrace: e.backtrace)
  raise e # reraise it since we don't deal with it now
end

#upload_async(file, options = {}) ⇒ Object

Create threads to upload the file to the container



29
30
31
32
33
34
# File 'lib/logstash/outputs/blob/uploader.rb', line 29

def upload_async(file, options = {})
  @workers_pool.post do
    LogStash::Util.set_thread_name("LogstashAzureBlobOutput output uploader, file: #{file.path}")
    upload(file, options)
  end
end