Class: CrawlWorker
- Inherits:
-
Object
- Object
- CrawlWorker
- Includes:
- Sidekiq::Worker
- Defined in:
- lib/crawl_worker.rb
Overview
Sidekiq server is multi-threaded so our Redis connection pool size defaults to concurrency (-c) Sidekiq.configure_server do |config|
config.redis = { :namespace => 'x', :url => 'redis://localhost:6379/14' }
end
Class Method Summary collapse
-
.jobs ⇒ Object
end.
Instance Method Summary collapse
-
#finished(content_request) ⇒ Object
Sets the crawl status to CobwebCrawlHelper::FINISHED and enqueues the crawl finished job.
- #perform(content_request) ⇒ Object
-
#send_to_processing_queue(content, content_request) ⇒ Object
Enqueues the content to the processing queue setup in options.
Class Method Details
.jobs ⇒ Object
end
78 79 80 81 82 |
# File 'lib/crawl_worker.rb', line 78 def self.jobs Sidekiq.redis do |conn| conn.smembers([:queue]).count end end |
Instance Method Details
#finished(content_request) ⇒ Object
Sets the crawl status to CobwebCrawlHelper::FINISHED and enqueues the crawl finished job
86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/crawl_worker.rb', line 86 def finished(content_request) additional_stats = {:crawl_id => content_request[:crawl_id], :crawled_base_url => @crawl.crawled_base_url} additional_stats[:redis_options] = content_request[:redis_options] unless content_request[:redis_options] == {} additional_stats[:source_id] = content_request[:source_id] unless content_request[:source_id].nil? @crawl.finish @crawl.debug_puts "increment crawl_finished_enqueued_count" @crawl.redis.incr("crawl_finished_enqueued_count") content_request[:crawl_finished_queue].constantize.perform_async(@crawl.statistics.merge(additional_stats)) end |
#perform(content_request) ⇒ Object
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/crawl_worker.rb', line 18 def perform(content_request) puts "Performing for #{content_request["url"]}" # setup the crawl class to manage the crawl of this object @crawl = CobwebModule::Crawl.new(content_request) content_request.merge!(:cookies => @crawl.) # update the counters and then perform the get, returns false if we are outwith limits if @crawl.retrieve # if the crawled object is an object type we are interested if @crawl.content.permitted_type? # extract links from content and process them if we are still within queue limits (block will not run if we are outwith limits) @crawl.process_links do |link| puts "Looking to add #{link}.." # @crawl.lock("queue_links") do # enqueue the links to sidekiq @crawl.debug_puts "QUEUED LINK: #{link}" enqueue_content(content_request, link) # end end if @crawl.to_be_processed? @crawl.process do # enqueue to processing queue @crawl.debug_puts "SENT FOR PROCESSING [#{@crawl.redis.get("crawl_job_enqueued_count")}] URL: #{@crawl.content.url}" send_to_processing_queue(@crawl.content.to_hash, content_request) #if the enqueue counter has been requested update that if content_request.has_key?(:enqueue_counter_key) enqueue_redis = Redis::Namespace.new(content_request[:enqueue_counter_namespace].to_s, :redis => RedisConnection.new(content_request[:redis_options])) current_count = enqueue_redis.hget(content_request[:enqueue_counter_key], content_request[:enqueue_counter_field]).to_i enqueue_redis.hset(content_request[:enqueue_counter_key], content_request[:enqueue_counter_field], current_count+1) end end else @crawl.debug_puts "@crawl.finished? #{@crawl.finished?}" @crawl.debug_puts "@crawl.within_crawl_limits? #{@crawl.within_crawl_limits?}" @crawl.debug_puts "@crawl.first_to_finish? #{@crawl.first_to_finish?}" end end end #@crawl.lock("finished") do # let the crawl know we're finished with this object @crawl.finished_processing # test queue and crawl sizes to see if we have completed the crawl @crawl.debug_puts "finished? #{@crawl.finished?}" if @crawl.finished? @crawl.debug_puts "Calling crawl_job finished" finished(content_request) end #end end |
#send_to_processing_queue(content, content_request) ⇒ Object
Enqueues the content to the processing queue setup in options
99 100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/crawl_worker.rb', line 99 def send_to_processing_queue(content, content_request) content_to_send = content.merge({:internal_urls => content_request[:internal_urls], :redis_options => content_request[:redis_options], :source_id => content_request[:source_id], :crawl_id => content_request[:crawl_id]}) content_to_send.keys.each do |key| content_to_send[key] = content_to_send[key].force_encoding('UTF-8') if content_to_send[key].kind_of?(String) end if content_request[:direct_call_process_job] clazz = content_request[:processing_queue].constantize clazz.perform(content_to_send) else content_request[:processing_queue].constantize.perform_async(content_to_send) end @crawl.debug_puts "#{content_request[:url]} has been sent for processing. use_encoding_safe_process_job: #{content_request[:use_encoding_safe_process_job]}" end |