Class: NewsCrawler::Storage::URLQueue::MongoEngine
- Inherits:
-
URLQueueEngine
- Object
- URLQueueEngine
- NewsCrawler::Storage::URLQueue::MongoEngine
- Includes:
- Mongo
- Defined in:
- lib/news_crawler/storage/url_queue/mongo_storage.rb
Overview
List storage engine with MongoDB backend
Constant Summary collapse
- NAME =
'mongo'
Instance Method Summary collapse
-
#add(url, ref_url = '') ⇒ Object
Add an URL to list with reference URL.
-
#all(*opts) ⇒ Array
Get all URL and status.
-
#clear(*opts) ⇒ Fixnum
Clear URL queue.
-
#find_all(modul_name, state, max_depth = -1)) ⇒ Array
TODO fix bug - find visited url Find all visited urls with given module process state.
-
#find_one(modul_name, state, max_depth = -1)) ⇒ String?
Find one visited url with given module process state.
-
#find_unvisited(max_depth = -1)) ⇒ Array
Get list of unvisited URL.
-
#get_url_depth(url) ⇒ Object
Get URL depth of given url return [ Fixnum ] URL depth.
-
#initialize(*opts) ⇒ MongoEngine
constructor
Construct a queue.
-
#mark(module_name, url, state) ⇒ Object
Set processing state of url in given module.
-
#mark_all(module_name, new_state, orig_state = nil) ⇒ Object
Change all url in an state to other state.
-
#mark_all_unvisited ⇒ Object
Mark all URLs as unvisited.
-
#mark_visited(url) ⇒ Object
Mark an URL as visited.
-
#next_unprocessed(modul_name, max_depth = -1)) ⇒ String?
(also: #find_and_mark)
Get next unprocessed a url and mark it as processing in atomic.
Methods inherited from URLQueueEngine
Constructor Details
#initialize(*opts) ⇒ MongoEngine
Construct a queue
38 39 40 41 42 43 44 45 46 47 |
# File 'lib/news_crawler/storage/url_queue/mongo_storage.rb', line 38 def initialize(*opts) config = SimpleConfig.for :application db = MongoClient.new(config.mongodb.host, config.mongodb.port, pool_size: 4, pool_timeout: 5)[config.mongodb.db_name] coll_name = config.prefix + '_' + config.suffix.url_queue h_opts = ((opts[-1].is_a? Hash) ? opts[-1] : {}) @coll = db[h_opts[:coll_name] || coll_name] @coll.ensure_index({:url => Mongo::ASCENDING}, {:unique => true}) end |
Instance Method Details
#add(url, ref_url = '') ⇒ Object
Add an URL to list with reference URL
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/news_crawler/storage/url_queue/mongo_storage.rb', line 52 def add(url, ref_url = '') if (ref_url == '') depth = 0 else depth = (get_url_depth(ref_url) || 0) + 1 end begin @coll.insert({:url => url, :depth => depth, :visited => false}) rescue Mongo::OperationFailure => e if e.error_code == 11000 # duplicate key error raise DuplicateURLError, url else raise e end end end |
#all(*opts) ⇒ Array
Get all URL and status
114 115 116 117 118 119 120 121 122 123 |
# File 'lib/news_crawler/storage/url_queue/mongo_storage.rb', line 114 def all(*opts) @coll.find.collect do | entry | entry.each_key.inject({}) do | memo, key | if key != '_id' memo[key.intern] = entry[key] end memo end end end |
#clear(*opts) ⇒ Fixnum
Clear URL queue
203 204 205 206 207 |
# File 'lib/news_crawler/storage/url_queue/mongo_storage.rb', line 203 def clear(*opts) count = @coll.count @coll.remove count end |
#find_all(modul_name, state, max_depth = -1)) ⇒ Array
TODO fix bug - find visited url Find all visited urls with given module process state
131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 |
# File 'lib/news_crawler/storage/url_queue/mongo_storage.rb', line 131 def find_all(modul_name, state, max_depth = -1) if (state == URLQueue::UNPROCESSED) selector = {:$or => [{modul_name => state}, {modul_name => {:$exists => false}}]} else selector = {modul_name => state} end selector = {:$and => [selector, {'visited' => true}]} if max_depth > -1 selector[:$and] << {'depth' => {:$lte => max_depth}} end @coll.find(selector).collect do | entry | entry['url'] end end |
#find_one(modul_name, state, max_depth = -1)) ⇒ String?
Find one visited url with given module process state
153 154 155 156 157 158 159 160 |
# File 'lib/news_crawler/storage/url_queue/mongo_storage.rb', line 153 def find_one(modul_name, state, max_depth = -1) a = find_all(modul_name, state, max_depth) if a.size > 0 a[0] else nil end end |
#find_unvisited(max_depth = -1)) ⇒ Array
Get list of unvisited URL
189 190 191 192 193 194 195 196 197 198 199 |
# File 'lib/news_crawler/storage/url_queue/mongo_storage.rb', line 189 def find_unvisited(max_depth = -1) if max_depth > -1 selector = {:$and => [{'visited' => false}, {'depth' => {:$lte => max_depth}}]} else selector = {'visited' => false} end @coll.find(selector).collect do | entry | entry['url'] end end |
#get_url_depth(url) ⇒ Object
Get URL depth of given url return [ Fixnum ] URL depth
212 213 214 |
# File 'lib/news_crawler/storage/url_queue/mongo_storage.rb', line 212 def get_url_depth(url) @coll.find_one({'url' => url}, {:fields => ['depth']})['depth'] end |
#mark(module_name, url, state) ⇒ Object
Set processing state of url in given module
96 97 98 99 |
# File 'lib/news_crawler/storage/url_queue/mongo_storage.rb', line 96 def mark(module_name, url, state) @coll.update({:url => url}, {:$set => {module_name => state}}) end |
#mark_all(module_name, new_state, orig_state = nil) ⇒ Object
Change all url in an state to other state
105 106 107 108 109 110 |
# File 'lib/news_crawler/storage/url_queue/mongo_storage.rb', line 105 def mark_all(module_name, new_state, orig_state = nil) selector = (orig_state.nil? ? {} : {module_name => orig_state}) @coll.update(selector, {:$set => {module_name => new_state}}, :multi => true) end |
#mark_all_unvisited ⇒ Object
Mark all URLs as unvisited
79 80 81 82 83 |
# File 'lib/news_crawler/storage/url_queue/mongo_storage.rb', line 79 def mark_all_unvisited @coll.update({}, {:$set => {'visited' => false}}, {:multi => true}) end |
#mark_visited(url) ⇒ Object
Mark an URL as visited
73 74 75 76 |
# File 'lib/news_crawler/storage/url_queue/mongo_storage.rb', line 73 def mark_visited(url) @coll.update({:url => url}, {:$set => {'visited' => true}}) end |
#next_unprocessed(modul_name, max_depth = -1)) ⇒ String? Also known as: find_and_mark
Get next unprocessed a url and mark it as processing in atomic
166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 |
# File 'lib/news_crawler/storage/url_queue/mongo_storage.rb', line 166 def next_unprocessed(modul_name, max_depth = -1) selector = {:$or => [{modul_name => URLQueue::UNPROCESSED}, {modul_name => {:$exists => false}}]} selector = {:$and => [selector, {'visited' => true}]} if max_depth > -1 selector[:$and] << {'depth' => {:$lte => max_depth}} end doc = @coll.find_and_modify(:query => selector, :update => {:$set => {modul_name => URLQueue::PROCESSING}}) if doc.nil? nil else doc['url'] end (doc.nil? ? nil : doc['url']) end |