Class: LogStash::Outputs::LogstashAzureBlobOutput::Uploader
- Inherits:
-
Object
- Object
- LogStash::Outputs::LogstashAzureBlobOutput::Uploader
- Defined in:
- lib/logstash/outputs/blob/uploader.rb
Overview
a sub class of LogstashAzureBlobOutput
this class uploads the files to Azure cloud
Constant Summary collapse
- TIME_BEFORE_RETRYING_SECONDS =
1
- DEFAULT_THREADPOOL =
Concurrent::ThreadPoolExecutor.new(min_threads: 1, max_threads: 8, max_queue: 1, fallback_policy: :caller_runs)
Instance Attribute Summary collapse
-
#blob_account ⇒ Object
Returns the value of attribute blob_account.
-
#container_name ⇒ Object
Returns the value of attribute container_name.
-
#logger ⇒ Object
Returns the value of attribute logger.
Instance Method Summary collapse
-
#initialize(blob_account, container_name, logger, threadpool = DEFAULT_THREADPOOL) ⇒ Uploader
constructor
Initializes the class.
-
#stop ⇒ Object
stop threads.
-
#upload(file, options = {}) ⇒ Object
Uploads the file to the container.
-
#upload_async(file, options = {}) ⇒ Object
Create threads to upload the file to the container.
Constructor Details
#initialize(blob_account, container_name, logger, threadpool = DEFAULT_THREADPOOL) ⇒ Uploader
Initializes the class
21 22 23 24 25 26 |
# File 'lib/logstash/outputs/blob/uploader.rb', line 21 def initialize(blob_account, container_name, logger, threadpool = DEFAULT_THREADPOOL) @blob_account = blob_account @workers_pool = threadpool @logger = logger @container_name = container_name end |
Instance Attribute Details
#blob_account ⇒ Object
Returns the value of attribute blob_account.
16 17 18 |
# File 'lib/logstash/outputs/blob/uploader.rb', line 16 def blob_account @blob_account end |
#container_name ⇒ Object
Returns the value of attribute container_name.
16 17 18 |
# File 'lib/logstash/outputs/blob/uploader.rb', line 16 def container_name @container_name end |
#logger ⇒ Object
Returns the value of attribute logger.
16 17 18 |
# File 'lib/logstash/outputs/blob/uploader.rb', line 16 def logger @logger end |
Instance Method Details
#stop ⇒ Object
stop threads
67 68 69 70 |
# File 'lib/logstash/outputs/blob/uploader.rb', line 67 def stop @workers_pool.shutdown @workers_pool.wait_for_termination(nil) # block until its done end |
#upload(file, options = {}) ⇒ Object
Uploads the file to the container
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/logstash/outputs/blob/uploader.rb', line 37 def upload(file, = {}) begin content = Object::File.open(file.path, 'rb').read filename = Object::File.basename file.path puts filename blob = blob_account.create_block_blob(container_name, filename, content) puts blob.name rescue => e # When we get here it usually mean that LogstashAzureBlobOutput tried to do some retry by himself (default is 3) # When the retry limit is reached or another error happen we will wait and retry. # # Thread might be stuck here, but I think its better than losing anything # its either a transient errors or something bad really happened. logger.error('Uploading failed, retrying', exception: e.class, message: e., path: file.path, backtrace: e.backtrace) retry end [:on_complete].call(file) unless [:on_complete].nil? blob rescue => e logger.error('An error occured in the `on_complete` uploader', exception: e.class, message: e., path: file.path, backtrace: e.backtrace) raise e # reraise it since we don't deal with it now end |
#upload_async(file, options = {}) ⇒ Object
Create threads to upload the file to the container
29 30 31 32 33 34 |
# File 'lib/logstash/outputs/blob/uploader.rb', line 29 def upload_async(file, = {}) @workers_pool.post do LogStash::Util.set_thread_name("LogstashAzureBlobOutput output uploader, file: #{file.path}") upload(file, ) end end |