Class: NewsCrawler::Storage::URLQueue::MongoEngine

Inherits:
URLQueueEngine show all
Includes:
Mongo
Defined in:
lib/news_crawler/storage/url_queue/mongo_storage.rb

Overview

List storage engine with MongoDB backend

Constant Summary collapse

NAME =
'mongo'

Instance Method Summary collapse

Methods inherited from URLQueueEngine

get_engines, inherited

Constructor Details

#initialize(*opts) ⇒ MongoEngine

Construct a queue



38
39
40
41
42
43
44
45
46
47
# File 'lib/news_crawler/storage/url_queue/mongo_storage.rb', line 38

def initialize(*opts)
  config = SimpleConfig.for :application
  db = MongoClient.new(config.mongodb.host, config.mongodb.port,
                       pool_size: 4,
                       pool_timeout: 5)[config.mongodb.db_name]
  coll_name = config.prefix + '_' + config.suffix.url_queue
  h_opts = ((opts[-1].is_a? Hash) ? opts[-1] : {})
  @coll = db[h_opts[:coll_name] || coll_name]
  @coll.ensure_index({:url => Mongo::ASCENDING}, {:unique => true})
end

Instance Method Details

#add(url, ref_url = '') ⇒ Object

Add an URL to list with reference URL

Parameters:

  • url (String)
  • ref_url (String) (defaults to: '')


52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/news_crawler/storage/url_queue/mongo_storage.rb', line 52

def add(url, ref_url = '')
  if (ref_url == '')
    depth = 0
  else
    depth = (get_url_depth(ref_url) || 0) + 1
  end
  begin
    @coll.insert({:url        => url,
                   :depth     => depth,
                   :visited   => false})
  rescue Mongo::OperationFailure => e
    if e.error_code == 11000  # duplicate key error
      raise DuplicateURLError, url
    else
      raise e
    end
  end
end

#all(*opts) ⇒ Array

Get all URL and status

Returns:

  • (Array)

    array of hash contains url and status



114
115
116
117
118
119
120
121
122
123
# File 'lib/news_crawler/storage/url_queue/mongo_storage.rb', line 114

def all(*opts)
  @coll.find.collect do | entry |
    entry.each_key.inject({}) do | memo, key |
      if key != '_id'
        memo[key.intern] = entry[key]
      end
      memo
    end
  end
end

#clear(*opts) ⇒ Fixnum

Clear URL queue

Returns:

  • (Fixnum)

    number of urls removed



203
204
205
206
207
# File 'lib/news_crawler/storage/url_queue/mongo_storage.rb', line 203

def clear(*opts)
  count = @coll.count
  @coll.remove
  count
end

#find_all(modul_name, state, max_depth = -1)) ⇒ Array

TODO fix bug - find visited url Find all visited urls with given module process state

Parameters:

  • modul_name (String)
  • state (String)

    one of unprocessed, processing, processed

  • max_depth (Fixnum) (defaults to: -1))

    max url depth return (inclusive)

Returns:

  • (Array)

    URL list



131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
# File 'lib/news_crawler/storage/url_queue/mongo_storage.rb', line 131

def find_all(modul_name, state, max_depth = -1)
  if (state == URLQueue::UNPROCESSED)
    selector = {:$or => [{modul_name => state},
                         {modul_name => {:$exists => false}}]}
  else
    selector = {modul_name => state}
  end
  selector = {:$and => [selector,
                        {'visited' => true}]}
  if max_depth > -1
    selector[:$and] << {'depth' => {:$lte => max_depth}}
  end
  @coll.find(selector).collect do | entry |
    entry['url']
  end
end

#find_one(modul_name, state, max_depth = -1)) ⇒ String?

Find one visited url with given module process state

Parameters:

  • modul_name (String)
  • state (String)

    one of unprocessed, processing, processed

  • max_depth (Fixnum) (defaults to: -1))

    max url depth return (inclusive)

Returns:

  • (String, nil)

    URL or nil if cann’t found url matches criterial



153
154
155
156
157
158
159
160
# File 'lib/news_crawler/storage/url_queue/mongo_storage.rb', line 153

def find_one(modul_name, state, max_depth = -1)
  a = find_all(modul_name, state, max_depth)
  if a.size > 0
    a[0]
  else
    nil
  end
end

#find_unvisited(max_depth = -1)) ⇒ Array

Get list of unvisited URL

Parameters:

  • max_depth (Fixnum) (defaults to: -1))

    maximum depth of url return

Returns:

  • (Array)

    unvisited url with maximum depth (option)



189
190
191
192
193
194
195
196
197
198
199
# File 'lib/news_crawler/storage/url_queue/mongo_storage.rb', line 189

def find_unvisited(max_depth = -1)
  if max_depth > -1
    selector = {:$and => [{'visited' => false},
                          {'depth'   => {:$lte => max_depth}}]}
  else
    selector = {'visited' => false}
  end
  @coll.find(selector).collect do | entry |
    entry['url']
  end
end

#get_url_depth(url) ⇒ Object

Get URL depth of given url return [ Fixnum ] URL depth

Parameters:

  • url (String)


212
213
214
# File 'lib/news_crawler/storage/url_queue/mongo_storage.rb', line 212

def get_url_depth(url)
  @coll.find_one({'url' => url}, {:fields => ['depth']})['depth']
end

#mark(module_name, url, state) ⇒ Object

Set processing state of url in given module

Parameters:

  • module_name (String)
  • url (String)
  • state (String)

    one of unprocessed, processing, processed



96
97
98
99
# File 'lib/news_crawler/storage/url_queue/mongo_storage.rb', line 96

def mark(module_name, url, state)
  @coll.update({:url  => url},
               {:$set => {module_name => state}})
end

#mark_all(module_name, new_state, orig_state = nil) ⇒ Object

Change all url in an state to other state

Parameters:

  • module_name (String)
  • new_state (String)

    new state

  • orig_state (String) (defaults to: nil)

    original state



105
106
107
108
109
110
# File 'lib/news_crawler/storage/url_queue/mongo_storage.rb', line 105

def mark_all(module_name, new_state, orig_state = nil)
  selector = (orig_state.nil? ? {} : {module_name => orig_state})
  @coll.update(selector,
               {:$set => {module_name => new_state}},
               :multi => true)
end

#mark_all_unvisitedObject

Mark all URLs as unvisited



79
80
81
82
83
# File 'lib/news_crawler/storage/url_queue/mongo_storage.rb', line 79

def mark_all_unvisited
  @coll.update({},
               {:$set => {'visited' => false}},
               {:multi => true})
end

#mark_visited(url) ⇒ Object

Mark an URL as visited

Parameters:

  • url (String)


73
74
75
76
# File 'lib/news_crawler/storage/url_queue/mongo_storage.rb', line 73

def mark_visited(url)
  @coll.update({:url  => url},
               {:$set => {'visited' => true}})
end

#next_unprocessed(modul_name, max_depth = -1)) ⇒ String? Also known as: find_and_mark

Get next unprocessed a url and mark it as processing in atomic

Parameters:

  • modul_name (String)
  • max_depth (Fixnum) (defaults to: -1))

    max url depth return (inclusive)

Returns:

  • (String, nil)

    URL or nil if url doesn’t exists



166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
# File 'lib/news_crawler/storage/url_queue/mongo_storage.rb', line 166

def next_unprocessed(modul_name, max_depth = -1)
  selector = {:$or => [{modul_name => URLQueue::UNPROCESSED},
                       {modul_name => {:$exists => false}}]}
  selector = {:$and => [selector,
                        {'visited' => true}]}
  if max_depth > -1
    selector[:$and] << {'depth' => {:$lte => max_depth}}
  end
  doc = @coll.find_and_modify(:query => selector,
                              :update => {:$set =>
                                {modul_name => URLQueue::PROCESSING}})
  if doc.nil?
    nil
  else
    doc['url']
  end
  (doc.nil? ? nil : doc['url'])
end