Class: CrawlHelper
- Inherits:
-
Object
- Object
- CrawlHelper
- Defined in:
- lib/crawl_helper.rb
Class Method Summary collapse
- .crawl_page(content_request) ⇒ Object
-
.finished(content_request) ⇒ Object
Sets the crawl status to ‘Crawl Finished’ and enqueues the crawl finished job.
-
.send_to_processing_queue(content, content_request) ⇒ Object
Enqueues the content to the processing queue setup in options.
Class Method Details
.crawl_page(content_request) ⇒ Object
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 |
# File 'lib/crawl_helper.rb', line 8 def self.crawl_page(content_request) # change all hash keys to symbols content_request = HashUtil.deep_symbolize_keys(content_request) @content_request = content_request content_request[:redis_options] = {} unless content_request.has_key? :redis_options content_request[:crawl_limit_by_page] = false unless content_request.has_key? :crawl_limit_by_page content_request[:valid_mime_types] = ["*/*"] unless content_request.has_key? :valid_mime_types content_request[:queue_system] = content_request[:queue_system].to_sym @redis = NamespacedRedisConnection.new(content_request[:redis_options], "cobweb-#{Cobweb.version}-#{content_request[:crawl_id]}") @stats = Stats.new(content_request) @debug = content_request[:debug] decrement_queue_counter # check we haven't crawled this url before unless @redis.sismember "crawled", content_request[:url] # if there is no limit or we're still under it lets get the url if within_crawl_limits?(content_request[:crawl_limit]) content = Cobweb.new(content_request).get(content_request[:url], content_request) if content_request[:url] == @redis.get("original_base_url") @redis.set("crawled_base_url", content[:base_url]) end if is_permitted_type(content) begin # move the url from the queued list to the crawled list - for both the original url, and the content url (to handle redirects) @redis.srem "queued", content_request[:url] @redis.sadd "crawled", content_request[:url] @redis.srem "queued", content[:url] @redis.sadd "crawled", content[:url] # increment the counter if we are not limiting by page only || we are limiting count by page and it is a page if content_request[:crawl_limit_by_page] if content[:mime_type].match("text/html") increment_crawl_started_counter end else increment_crawl_started_counter end ## update statistics @stats.update_status("Crawling #{content_request[:url]}...") @stats.update_statistics(content) # set the base url if this is the first page set_base_url @redis, content, content_request @cobweb_links = CobwebLinks.new(content_request) if within_queue_limits?(content_request[:crawl_limit]) internal_links = ContentLinkParser.new(content_request[:url], content[:body], content_request).all_links(:valid_schemes => [:http, :https]) # select the link if its internal internal_links.select! { |link| @cobweb_links.internal?(link) } # if the site has the same content for http and https then normalize to http if @options[:treat_https_as_http] internal_links.map!{|link| link.gsub(/^https/, "http")} end # reject the link if we've crawled it or queued it internal_links.reject! { |link| @redis.sismember("crawled", link) } internal_links.reject! { |link| @redis.sismember("queued", link) } internal_links.each do |link| enqueue_content(content_request, link) if within_queue_limits?(content_request[:crawl_limit]) end end # enqueue to processing queue send_to_processing_queue(content, content_request) #if the enqueue counter has been requested update that if content_request.has_key? :enqueue_counter_key enqueue_redis = NamespacedRedisConnection.new(content_request[:redis_options], content_request[:enqueue_counter_namespace].to_s) current_count = enqueue_redis.hget(content_request[:enqueue_counter_key], content_request[:enqueue_counter_field]).to_i enqueue_redis.hset(content_request[:enqueue_counter_key], content_request[:enqueue_counter_field], current_count+1) end ensure #update the queued and crawled lists if we are within the crawl limits. # update the queue and crawl counts -- doing this very late in the piece so that the following transaction all occurs at once. # really we should do this with a lock https://github.com/PatrickTulskie/redis-lock if content_request[:crawl_limit_by_page] if content[:mime_type].match("text/html") increment_crawl_counter end else increment_crawl_counter end puts "Crawled: #{@crawl_counter} Limit: #{content_request[:crawl_limit]} Queued: #{@queue_counter} In Progress: #{@crawl_started_counter-@crawl_counter}" if @debug end else puts "ignoring #{content_request[:url]} as mime_type is #{content[:mime_type]}" if content_request[:debug] end else puts "ignoring #{content_request[:url]} as outside of crawl limits." if content_request[:debug] end else @redis.srem "queued", content_request[:url] puts "Already crawled #{content_request[:url]}" if content_request[:debug] end # if there's nothing left queued or the crawled limit has been reached refresh_counters if content_request[:crawl_limit].nil? || content_request[:crawl_limit] == 0 if @queue_counter+@crawl_started_counter-@crawl_counter == 0 finished(content_request) end elsif (@queue_counter +@crawl_started_counter-@crawl_counter)== 0 || @crawl_counter >= content_request[:crawl_limit].to_i finished(content_request) end end |
.finished(content_request) ⇒ Object
Sets the crawl status to ‘Crawl Finished’ and enqueues the crawl finished job
126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 |
# File 'lib/crawl_helper.rb', line 126 def self.finished(content_request) # finished if @redis.hget("statistics", "current_status")!= "Crawl Finished" #ap "CRAWL FINISHED #{content_request[:url]}, #{counters}, #{@redis.get("original_base_url")}, #{@redis.get("crawled_base_url")}" if content_request[:debug] @stats.end_crawl(content_request) additional_stats = {:crawl_id => content_request[:crawl_id], :crawled_base_url => @redis.get("crawled_base_url")} additional_stats[:redis_options] = content_request[:redis_options] unless content_request[:redis_options] == {} additional_stats[:source_id] = content_request[:source_id] unless content_request[:source_id].nil? if content_request[:queue_system] == :resque Resque.enqueue(const_get(content_request[:crawl_finished_queue]), @stats.get_statistics.merge(additional_stats)) elsif content_request[:queue_system] == :sidekiq puts "Queueing Finished on Sidekiq" const_get(content_request[:crawl_finished_queue]).perform_async(@stats.get_statistics.merge(additional_stats)) else raise "Unknown queue system: #{content_request[:queue_system]}" end else # nothing to report here, we're skipping the remaining urls as we're outside of the crawl limit end end |
.send_to_processing_queue(content, content_request) ⇒ Object
Enqueues the content to the processing queue setup in options
150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 |
# File 'lib/crawl_helper.rb', line 150 def self.send_to_processing_queue(content, content_request) content_to_send = content.merge({:internal_urls => content_request[:internal_urls], :redis_options => content_request[:redis_options], :source_id => content_request[:source_id], :crawl_id => content_request[:crawl_id]}) if content_request[:direct_call_process_job] clazz = const_get(content_request[:processing_queue]) clazz.perform(content_to_send) elsif content_request[:use_encoding_safe_process_job] content_to_send[:body] = Base64.encode64(content[:body]) content_to_send[:processing_queue] = content_request[:processing_queue] if content_request[:queue_system] == :resque Resque.enqueue(EncodingSafeProcessJob, content_to_send) elsif content_request[:queue_system] == :sidekiq const_get(content_request[:processing_queue]).perform_async(content_to_send) else raise "Unknown queue system: #{content_request[:queue_system]}" end else if content_request[:queue_system] == :resque Resque.enqueue(const_get(content_request[:processing_queue]), content_to_send) elsif content_request[:queue_system] == :sidekiq puts "Queueing on Sidekiq" const_get(content_request[:processing_queue]).perform_async(content_to_send) else raise "Unknown queue system: #{content_request[:queue_system]}" end end puts "#{content_request[:url]} has been sent for processing. use_encoding_safe_process_job: #{content_request[:use_encoding_safe_process_job]}" if content_request[:debug] end |