Class: LogStash::Filters::Empow::Classifier
- Inherits:
-
Object
- Object
- LogStash::Filters::Empow::Classifier
- Includes:
- Util::Loggable
- Defined in:
- lib/logstash/filters/classifier.rb
Constant Summary collapse
- MAX_CONCURRENT_REQUESTS =
10000
- BATCH_TIMEOUT =
10
Instance Method Summary collapse
- #classify(request) ⇒ Object
- #close ⇒ Object
-
#initialize(online_classifer, local_classifier, online_classification_workers, batch_size, batch_interval, max_retries, time_between_queries) ⇒ Classifier
constructor
A new instance of Classifier.
Constructor Details
#initialize(online_classifer, local_classifier, online_classification_workers, batch_size, batch_interval, max_retries, time_between_queries) ⇒ Classifier
Returns a new instance of Classifier.
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/logstash/filters/classifier.rb', line 16 def initialize(online_classifer, local_classifier, online_classification_workers, batch_size, batch_interval, max_retries, time_between_queries) @logger ||= self.logger @logger.info("initializing classifier") @local_classifier = local_classifier @online_classifer = online_classifer @batch_interval = batch_interval @time_between_queries = time_between_queries @inflight_requests = Concurrent::Hash.new @new_request_queue = java.util.concurrent.ArrayBlockingQueue.new(MAX_CONCURRENT_REQUESTS) @bulk_processor = Classification::BulkProcessor.new(max_retries, batch_size, time_between_queries, @inflight_requests, online_classifer, local_classifier, online_classification_workers) @worker_pool = Concurrent::FixedThreadPool.new(1) @worker_pool.post do while @worker_pool.running? do begin management_task() rescue StandardError => e @logger.error("encountered an error while running the management task", :error => e, :backtrace => e.backtrace) end end end @logger.debug("classifier initialized") @last_action_time = Time.now end |
Instance Method Details
#classify(request) ⇒ Object
97 98 99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/logstash/filters/classifier.rb', line 97 def classify(request) return nil if request.nil? res = @local_classifier.classify(request) @logger.trace("cached result", :request => request, :res => res) return res if !res.nil? request_online_classifiction(request) return nil end |
#close ⇒ Object
48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/logstash/filters/classifier.rb', line 48 def close @logger.info("shutting down empow's classifcation plugin") @inflight_requests.clear() @bulk_processor.close @worker_pool.kill() @worker_pool.wait_for_termination(5) @logger.info("empow classifcation plugin closed") end |