Class: LogStash::Filters::EmpowClassifier

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/filters/empowclassifier.rb

Constant Summary collapse

CLASSIFICATION_URL =
'https://intent.cloud.empow.co'
CACHE_TTL =
(24*60*60)

Instance Method Summary collapse

Instance Method Details

#filter(event) ⇒ Object



184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
# File 'lib/logstash/filters/empowclassifier.rb', line 184

public def filter(event)
  res = event

  begin
    res = @plugin_core.classify(event)

    if res.nil?
      return
    end

    # event was classified and returned, not some overflow event
    if res.equal? event
      filter_matched(event)

      return
    end

    # got here with a parked event
    filter_matched(res)

    @logger.debug("filter matched for overflow event", :event => res)

    yield res

  rescue StandardError => e
    @logger.error("encountered an exception while classifying", :error => e, :event => event, :backtrace => e.backtrace)

    @tag_on_error.each{|tag| event.tag(tag)}
  end
end

#flush(options = {}) ⇒ Object



161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
# File 'lib/logstash/filters/empowclassifier.rb', line 161

public def flush(options = {})
  @logger.debug("entered flush")

  events_to_flush = []

  begin
    parked_events = @plugin_core.flush(options)

    parked_events.each do |event|
      event.uncancel

      events_to_flush << event
    end

  rescue StandardError => e
    @logger.error("encountered an exception while processing flush", :error => e)
  end

  @logger.debug("flush ended", :flushed_event_count => events_to_flush.length)

  return events_to_flush
end

#registerObject



110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/logstash/filters/empowclassifier.rb', line 110

def register
  @logger.info("registering empow classifcation plugin")

  validate_params()

  local_db = create_local_database

  local_classifier = LogStash::Filters::Empow::LocalClassifier.new(@cache_size, CACHE_TTL, @async_local_cache, local_db)

  base_url = get_effective_url()
  online_classifier = LogStash::Filters::Empow::ClassificationCenterClient.new(@username, @password, @authentication_hash, base_url)

  classifer = LogStash::Filters::Empow::Classifier.new(online_classifier, local_classifier, @max_classification_center_workers, @bulk_request_size, @bulk_request_interval, @max_query_retries, @time_between_queries)

  field_handler = LogStash::Filters::Empow::FieldHandler.new(@product_type_field, @product_name_field, @threat_field, @src_internal_field, @dst_internal_field)

  @plugin_core ||= LogStash::Filters::Empow::PluginLogic.new(classifer, field_handler, @pending_request_timeout, @max_pending_requests, @tag_on_timeout, @tag_on_error)

  @logger.info("empow classifcation plugin registered")
end