Class: PcaprLocal::Scanner

Inherits:
Object
  • Object
show all
Defined in:
lib/pcapr_local/scanner.rb

Constant Summary collapse

RE_PCAP =
/\.p?cap\Z/

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config) ⇒ Scanner

Returns a new instance of Scanner.



37
38
39
40
41
42
43
44
# File 'lib/pcapr_local/scanner.rb', line 37

def initialize config
    @db = config.fetch('db')
    @xtractr = config.fetch('xtractr')
    @pcap_dir  = File.expand_path(config.fetch('pcap_dir'))
    @index_dir = File.expand_path(config.fetch('index_dir'))
    @queue_delay = config.fetch('queue_delay')
    @scan_interval = config.fetch('interval')
end

Class Method Details

.start(config) ⇒ Object

Creates scanner instance and starts it.



15
16
17
18
19
# File 'lib/pcapr_local/scanner.rb', line 15

def self.start config
    scanner = Scanner.new config
    scanner.start
    scanner
end

Instance Method Details

#add_pcap(relative_path) ⇒ Object

Adds pcap to db with status set to “queued”. Returns nil w/out updating db if the pcap was modified within the last queue_delay seconds (because pcap may not be completely copied to pcap_dir).



105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/pcapr_local/scanner.rb', line 105

def add_pcap relative_path
    now = Time.new
    stat = File.stat(File.join(@pcap_dir, relative_path))
    if now - stat.mtime < @queue_delay
        return
    end

    # Pick determistic doc id based on path and pcap size.
    # (for testing convenience).
    id = Digest::MD5.new
    id << "#{relative_path}:#{stat.size}"

    doc = CouchRest::Document.new({
        :_id => id.to_s,
        :type => 'pcap',
        :filename => relative_path,
        :status => 'queued',
        :stat => {
            :inode => stat.ino,
            :size => stat.size,
            :ctime => stat.ctime,
        },
        :created_at => now,
        :updated_at => now,
    })
    @db.save_doc doc

    doc
end

#find_pcapsObject

Returns a set of pcap files (relative paths)



68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/pcapr_local/scanner.rb', line 68

def find_pcaps
    if not File.directory?(@pcap_dir) or not File.readable?(@pcap_dir)
        return Set.new
    end

    pcaps = Set.new
    pcap_prefix_size = @pcap_dir.size + 1 # /
    Find.find @pcap_dir do |path|
        # Don't recurse into ".pcapr_local" or other "." dirs
        if File.basename(path) =~ /^\./
            Find.prune
        end

        # Should be a file ending in .cap or .pcap
        next unless path =~ RE_PCAP and File.file?(path)

        rel_path = path[pcap_prefix_size..-1]
        pcaps << rel_path
    end
    pcaps
end

#indexObject

Indexes all documents in queue. Returns count of documents indexed.



136
137
138
139
140
141
142
143
# File 'lib/pcapr_local/scanner.rb', line 136

def index
    count = 0
    @db.each_in_view("pcaps/queued", :include_docs => true) do |row|
        index_pcap row['doc']
        count += 1
    end
    count
end

#index_path(rel_path) ⇒ Object



187
188
189
190
191
192
# File 'lib/pcapr_local/scanner.rb', line 187

def index_path rel_path
    if rel_path.is_a? Hash
        rel_path = rel_path.fetch :filename
    end
    File.expand_path File.join(@index_dir, rel_path)
end

#index_pcap(pcap) ⇒ Object

Creates xtractr index for pcap. Updates status from “queued” to “indexing” to “indexed”. Any exception will result in a status of “failed” with the exception’s message copied to the document’s message attribute.



148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
# File 'lib/pcapr_local/scanner.rb', line 148

def index_pcap pcap
    relative_path = pcap["filename"]
    pcap_path = File.join(File.expand_path(@pcap_dir), relative_path)
    index_dir = File.join(File.expand_path(@index_dir), relative_path)

    # Index
    Logger.info "Indexing #{relative_path}"
    begin
        @db.update_doc pcap["_id"] do |doc|
            doc["status"] = "indexing"
            doc
        end

        index_data = @xtractr.index pcap_path, index_dir

        @db.update_doc pcap["_id"] do |doc|
            doc['index'] = index_data
            doc['status'] = 'indexed'
            doc
        end
    rescue 
        Logger.warn "Indexing failure: #{$!.message}"
        @db.update_doc pcap["_id"] do |doc|
            doc['status']  = "failed"
            doc['message'] = $!.message
            doc
        end
    end

    return
end

#pcap_path(rel_path) ⇒ Object



180
181
182
183
184
185
# File 'lib/pcapr_local/scanner.rb', line 180

def pcap_path rel_path
    if rel_path.is_a? Hash
        rel_path = rel_path[:filename] or raise "path not found in #{rel_path.inspect}"
    end
    File.expand_path File.join(@pcap_dir, rel_path)
end

#reconcile_with_db(fs_pcaps) ⇒ Object

Checks each pcap in the database, purging or requeueing documents as necessary. Any pcaps in fs_pcaps that are not in the database are added.



205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
# File 'lib/pcapr_local/scanner.rb', line 205

def reconcile_with_db fs_pcaps
    fs_pcaps = fs_pcaps.dup 

    indexed = Set.new
    @db.each_in_view("pcaps/indexed") do |row|
        indexed << row['key']
    end

    @db.each_in_view("pcaps/by_filename") do |row|
        path = row['key']

        # Delete record if from database if pcap is not present on the 
        # file system.
        if not fs_pcaps.include? path
            Logger.warn "Indexer: removing database entry for missing pcap #{path}"
            @db.delete_doc @db.get(row['id'])
            remove_index_for(path)

            next
        end

        # Requeue pcap if xtractr index is missing or is older than the pcap.
        if indexed.include? path
            pcap_index_dir = File.join(@index_dir, path)
            if not Xtractr.index_dir?(pcap_index_dir)
                Logger.warn "Index is missing, requeueing #{path}"
                requeue_pcap path
            elsif Xtractr.index_time(pcap_index_dir) < File.mtime(pcap_path(path)).to_f
                Logger.info "Pcap is newer than index, requeueing #{path}"
                requeue_pcap path
            end
        end

        fs_pcaps.delete path
    end

    # Remaining pcaps are unknown, add them to database
    fs_pcaps.each do |path|
        Logger.debug "New pcap: #{path}"
        add_pcap path
    end
end

#remove_doc(doc) ⇒ Object

Removes doc from database and corresponding index file. Does not remove original pcap.



48
49
50
51
52
53
54
# File 'lib/pcapr_local/scanner.rb', line 48

def remove_doc doc
    @db.delete_doc(doc)
    if filename = doc['filename']
        FileUtils.rm_f pcap_path(filename)
        remove_index_for(filename)
    end
end

#remove_index_for(rel_path) ⇒ Object

Because FileUtils.rm_rf is too dangerous.



195
196
197
198
199
200
201
# File 'lib/pcapr_local/scanner.rb', line 195

def remove_index_for rel_path
    target = index_path rel_path
    if File.directory? target
        FileUtils.rm_rf Dir.glob("#{target}/*.db")
        FileUtils.rmdir target rescue nil
    end
end

#requeue_pcap(rel_path) ⇒ Object



90
91
92
93
94
95
96
97
98
99
100
# File 'lib/pcapr_local/scanner.rb', line 90

def requeue_pcap rel_path
    res = @db.view("pcaps/by_filename", :key => rel_path)
    return nil if res['rows'].empty?

    id = res['rows'][0]["id"]
    @db.update_doc id do |doc|
        doc['status'] = 'queued'
        doc.delete 'index'
        doc
    end
end

#scanObject



56
57
58
59
60
61
62
63
# File 'lib/pcapr_local/scanner.rb', line 56

def scan
    # Get list of all pcaps
    pcaps = self.find_pcaps
    # Cleanup db and queue new pcaps
    reconcile_with_db pcaps
    # Index queued pcaps.
    self.index
end

#startObject

Runs scanner loop in separate thread.



22
23
24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/pcapr_local/scanner.rb', line 22

def start
    Logger.info "Starting scanner thread"
    Thread.new do
        loop do 
            begin
                scan
                @db.compact!
            rescue Exception => e
                Logger.error "Exception during scanning: #{e.message}\n" + e.backtrace.join("\n")
            end
            sleep @scan_interval
        end
    end
end