Class: LogStash::Filters::Empow::LocalClassifier

Inherits:
Object
  • Object
show all
Includes:
Util::Loggable
Defined in:
lib/logstash/filters/local-classifier.rb

Instance Method Summary collapse

Constructor Details

#initialize(cache_size, ttl, async_local_db, local_db) ⇒ LocalClassifier

Returns a new instance of LocalClassifier.



8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# File 'lib/logstash/filters/local-classifier.rb', line 8

def initialize(cache_size, ttl, async_local_db, local_db)
	@logger ||= self.logger

	@logger.debug("initializing in memory cache")
	@logger.debug("cache size #{cache_size}")
	@logger.debug("cache ttl #{ttl}")

	@cache ||= LogStash::Filters::Empow::ClassifierCache.new(cache_size, ttl)
	@ttl = ttl

	@local_db ||= local_db

	@local_db_workers ||= Concurrent::ThreadPoolExecutor.new(min_threads: 1, max_threads: 1)
	@async_local_db ||= async_local_db
end

Instance Method Details

#add_to_cache(key, val, expiration_time) ⇒ Object



43
44
45
46
47
48
49
# File 'lib/logstash/filters/local-classifier.rb', line 43

def add_to_cache(key, val, expiration_time)
	return if key.nil?

	@logger.debug? and @logger.info("adding #{key} to cache")

	@cache.put(key, val, Time.now+3600)
end

#classify(key) ⇒ Object



34
35
36
37
38
39
40
41
# File 'lib/logstash/filters/local-classifier.rb', line 34

def classify(key)
	if !key.nil?
		cached_result = @cache.classify(key)
		return cached_result if !cached_result.nil?
	end

	return classify_using_local_database(key)
end

#classify_using_local_database(key) ⇒ Object



83
84
85
86
87
88
89
90
91
92
# File 'lib/logstash/filters/local-classifier.rb', line 83

def classify_using_local_database(key)
	return nil if @local_db.nil? # if a local db wasn't configured

	if (@async_local_db)
		read_from_local_database_async(key)
		return nil
	end

	return read_from_local_database(key)
end

#closeObject



24
25
26
27
28
29
30
31
# File 'lib/logstash/filters/local-classifier.rb', line 24

def close
	@logger.debug("shutting down local classifier")

	@local_db_workers.shutdown if !@local_db.nil?

	@local_db_workers.wait_for_termination(1)
	@logger.debug("local classifier shut down")
end

#read_from_local_database(key) ⇒ Object



66
67
68
69
70
71
72
73
74
75
# File 'lib/logstash/filters/local-classifier.rb', line 66

def read_from_local_database(key)
	res = @local_db.query(key[:product_type], key[:product], key[:term])

	if !res.nil?
		@logger.debug("adding result from db to local cache")
		add_to_cache(key, res, Time.now + @ttl)
	end

	return res
end

#read_from_local_database_async(key) ⇒ Object



77
78
79
80
81
# File 'lib/logstash/filters/local-classifier.rb', line 77

def read_from_local_database_async(key)
	@local_db_workers.post do
		read_from_local_database(key)
	end
end

#save_to_cache_and_db(key, val, expiration_time) ⇒ Object



51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/logstash/filters/local-classifier.rb', line 51

def save_to_cache_and_db(key, val, expiration_time)
	return if key.nil?

	@logger.debug? and @logger.info("adding #{key} to the local db and cache")

	product_type = key[:product_type]
	product = key[:product]
	term = key[:term]

	doc_id = "#{product_type}-#{product}-term"

	@local_db.save(doc_id, product_type, product, term, val) if !@local_db.nil?
	add_to_cache(key, val, expiration_time)
end