Class: LogStash::Inputs::GoogleCloudStorage
- Inherits:
-
Base
- Object
- Base
- LogStash::Inputs::GoogleCloudStorage
- Defined in:
- lib/logstash/inputs/google_cloud_storage.rb
Overview
GoogleCloudStorage is an input plugin for Logstash that reads blobs in Cloud Storage buckets.
Instance Attribute Summary collapse
-
#event_output_queue ⇒ Object
Accessors for testing.
-
#processed_db ⇒ Object
readonly
Returns the value of attribute processed_db.
Instance Method Summary collapse
-
#list_download_process ⇒ Object
Fetches new files ready to be processed, downloads and processes them and finally runs post-processing steps.
- #register ⇒ Object
- #run(queue) ⇒ Object
- #stop ⇒ Object
Instance Attribute Details
#event_output_queue ⇒ Object
Accessors for testing
36 37 38 |
# File 'lib/logstash/inputs/google_cloud_storage.rb', line 36 def event_output_queue @event_output_queue end |
#processed_db ⇒ Object (readonly)
Returns the value of attribute processed_db.
37 38 39 |
# File 'lib/logstash/inputs/google_cloud_storage.rb', line 37 def processed_db @processed_db end |
Instance Method Details
#list_download_process ⇒ Object
Fetches new files ready to be processed, downloads and processes them and finally runs post-processing steps.
67 68 69 70 71 72 73 |
# File 'lib/logstash/inputs/google_cloud_storage.rb', line 67 def list_download_process list_processable_blobs do |blob| @logger.info("Found matching blob #{blob.uri}") download_and_process(blob) postprocess(blob) end end |
#register ⇒ Object
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/logstash/inputs/google_cloud_storage.rb', line 39 def register FileUtils.mkdir_p(@temp_directory) unless Dir.exist?(@temp_directory) @client = LogStash::Inputs::CloudStorage::Client.new(@bucket_id, @json_key_file, @logger) if @processed_db_path.nil? ls_data = LogStash::SETTINGS.get_value('path.data') @processed_db_path = File.join(ls_data, 'plugins', 'inputs', 'google_cloud_storage', 'db') end @logger.info("ProcessedDb created in: #{@processed_db_path}") @processed_db = LogStash::Inputs::CloudStorage::ProcessedDb.new(@processed_db_path) @blob_filter = LogStash::Inputs::CloudStorage::BlobFilter.new(@logger, @file_matches, @file_exclude, @metadata_key, @processed_db) end |
#run(queue) ⇒ Object
56 57 58 59 60 61 62 63 |
# File 'lib/logstash/inputs/google_cloud_storage.rb', line 56 def run(queue) @event_output_queue = queue @main_plugin_thread = Thread.current Stud.interval(@interval) do list_download_process end end |
#stop ⇒ Object
75 76 77 78 |
# File 'lib/logstash/inputs/google_cloud_storage.rb', line 75 def stop # Stud events were started on the main plugin thread so stop all events relative to it. Stud.stop!(@main_plugin_thread) end |