Class: LogStash::Filters::Empow::Classifier

Inherits:
Object
  • Object
show all
Includes:
Util::Loggable
Defined in:
lib/logstash/filters/classifier.rb

Constant Summary collapse

MAX_CONCURRENT_REQUESTS =
10000
BATCH_TIMEOUT =
10

Instance Method Summary collapse

Constructor Details

#initialize(online_classifer, local_classifier, online_classification_workers, batch_size, batch_interval, max_retries, time_between_queries) ⇒ Classifier

Returns a new instance of Classifier.



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/logstash/filters/classifier.rb', line 16

def initialize(online_classifer, local_classifier, online_classification_workers, batch_size, batch_interval, max_retries, time_between_queries)
	@logger ||= self.logger

	@logger.info("initializing classifier")

	@local_classifier = local_classifier
	@online_classifer = online_classifer
	@batch_interval = batch_interval
	@time_between_queries = time_between_queries

	@inflight_requests = Concurrent::Hash.new
	@new_request_queue = java.util.concurrent.ArrayBlockingQueue.new(MAX_CONCURRENT_REQUESTS)

	@bulk_processor = Classification::BulkProcessor.new(max_retries, batch_size, time_between_queries, @inflight_requests, online_classifer, local_classifier, online_classification_workers)

	@worker_pool = Concurrent::FixedThreadPool.new(1)

	@worker_pool.post do
		while @worker_pool.running? do
			begin
				management_task()
			rescue StandardError => e
				@logger.error("encountered an error while running the management task", :error => e, :backtrace => e.backtrace)
			end
		end
	end
	@logger.debug("classifier initialized")

	@last_action_time = Time.now
end

Instance Method Details

#classify(request) ⇒ Object



97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/logstash/filters/classifier.rb', line 97

def classify(request)
	return nil if request.nil?
	
	res = @local_classifier.classify(request)
	
	@logger.trace("cached result", :request => request, :res => res)
	
	return res if !res.nil?

	request_online_classifiction(request)

	return nil
end

#closeObject



48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/logstash/filters/classifier.rb', line 48

def close
	@logger.info("shutting down empow's classifcation plugin")

	@inflight_requests.clear()

	@bulk_processor.close

	@worker_pool.kill()
	@worker_pool.wait_for_termination(5)

	@logger.info("empow classifcation plugin closed")
end