Class: Threatinator::FeedRunner

Inherits:
Object
  • Object
show all
Includes:
Observable, Logging
Defined in:
lib/threatinator/feed_runner.rb

Overview

Runs those feeds!

Has the following observations:
  :start                 - start of feed parsing

  :start_fetch           - start of fetching
  :end_fetch             - end of fetching

  :start_decode          - start of decoding
  :end_decode            - end of decoding

  :start_parse_record    - start of record parse
     - record                - The record

  :record_filtered       - Indicates that the record was filtered.
     - record                - The record

  :record_missed         - Indicates that the record was not parsed
     - record                - The record

  :record_parsed         - Indicates that the record WAS parsed
     - record                - The record
     - events                - The events that were parsed out of the 
                               record

  :record_error         - Indicates that the record WAS parsed
     - record                - The record
     - errors           - An array of exceptions that caused the error

  :end_parse_record      - when a record has been parsed
     - record                - The record

  :end                   - completion of feed parsing

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Logging

included, #logger

Constructor Details

#initialize(feed, output_formatter, opts = {}) ⇒ FeedRunner

Returns a new instance of FeedRunner.

Parameters:



46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/threatinator/feed_runner.rb', line 46

def initialize(feed, output_formatter, opts = {})
  @feed = feed
  @output_formatter = output_formatter
  @feed_filters = @feed.filter_builders.map { |x| x.call } 
  @decoders = @feed.decoder_builders.map { |x| x.call } 
  @parser_block = @feed.parser_block
  @create_event_proc = self.method(:create_event).to_proc

  @event_builder = Threatinator::EventBuilder.new(@feed.provider, @feed.name)

  @total_events_built = 0
  @event_errors = []
  @built_events = []
end

Class Method Details

.run(feed, output, run_opts = {}) ⇒ Object

Runs a feed

Parameters:



172
173
174
# File 'lib/threatinator/feed_runner.rb', line 172

def self.run(feed, output, run_opts = {})
  self.new(feed, output).run(run_opts)
end

Instance Method Details

#create_event {|@event_builder| ... } ⇒ Object

Yields:

  • (@event_builder)


122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/threatinator/feed_runner.rb', line 122

def create_event
  @event_builder.reset
  @event_errors.clear
  yield(@event_builder)
  begin
    event = @event_builder.build
    @total_events_built += 1
    @built_events << event
  rescue Threatinator::Exceptions::EventBuildError => e
    @event_errors << e
  end
end

#parse_record(record) ⇒ Object



135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
# File 'lib/threatinator/feed_runner.rb', line 135

def parse_record(record)
  @built_events.clear
  events = []
  changed(true); notify_observers(:start_parse_record, record)

  if @feed_filters.any? { |filter| filter.filter?(record) }
    changed(true); notify_observers(:record_filtered, record)
    return
  end

  @parser_block.call(@create_event_proc, record)

  if @event_errors.count > 0
    changed(true); notify_observers(:record_error, record, @event_errors)
    position = "line: #{record.line_number}, start: #{record.pos_start}, end: #{record.pos_end}"
    messages = @event_errors.map { |e| e.to_s }.join(', ')
    logger.debug("Error generating event from record (#{position}): #{messages}")
  elsif @built_events.count == 0
    changed(true); notify_observers(:record_missed, record)
    position = "line: #{record.line_number}, start: #{record.pos_start}, end: #{record.pos_end}"
    logger.debug("Expected event to be generated, but got none from record (#{position})")
  else 
    @built_events.each do |event|
      events << event
      @output_formatter.handle_event(event)
    end
    changed(true); notify_observers(:record_parsed, record, events)
  end
  return
ensure 
  changed(true); notify_observers(:end_parse_record, record)
end

#run(opts = {}) ⇒ Object

Parameters:

  • opts (Hash) (defaults to: {})

    The options hash

Options Hash (opts):

  • :io (IO-like)

    Override the fetcher by providing an IO directly.

  • :skip_decoding (Boolean) — default: false

    Skip all decoding if set to true. Useful for testing.



66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/threatinator/feed_runner.rb', line 66

def run(opts = {})
  ios = [ ]
  logger.debug("#run starting #{@feed.provider}:#{@feed.name}") if logger.debug?
  start = Time.now
  changed(true); notify_observers(:start)
  skip_decoding = !!opts.delete(:skip_decoding)


  unless io = opts.delete(:io)
    fetcher = @feed.fetcher_builder.call()
    changed(true); notify_observers(:start_fetch)
    io = fetcher.fetch()
    changed(true); notify_observers(:end_fetch)
  else
    logger.debug('#run Skipping fetch. IO object was provided')
  end

  ios << io

  unless skip_decoding == true
    changed(true); notify_observers(:start_decode)
    @decoders.each do |decoder|
      new_io = decoder.decode(io)
      ios << new_io
      io = new_io
    end
    changed(true); notify_observers(:end_decode)
  end

  parser = @feed.parser_builder.call()

  parser.run(io) do |record|
    rr = parse_record(record)
  end

  changed(true); notify_observers(:end)
  
  logger.debug("#run finished #{@feed.provider}:#{@feed.name} in #{Time.now - start} seconds") if logger.debug?
  nil
ensure
  # Close all IO objects that we've seen. 
  while some_io = ios.pop
    unless some_io.closed?
      begin
        some_io.close
      rescue => e
        #:nocov:
        logger.warn("Failed to close IO: #{e} #{e.message}")
        #:nocov:
      end
    end
  end

  @output_formatter.finish
end