Class: Apollo::Planner::SmartPlanner
- Inherits:
-
BasePlanner
- Object
- BasePlanner
- Apollo::Planner::SmartPlanner
- Defined in:
- lib/apollo_crawler/planner/smart_planner.rb
Instance Attribute Summary collapse
-
#amqp ⇒ Object
Returns the value of attribute amqp.
-
#declarations ⇒ Object
Returns the value of attribute declarations.
-
#mongo ⇒ Object
Returns the value of attribute mongo.
Instance Method Summary collapse
- #fetch_queued_urls(opts = {}) ⇒ Object
- #fetch_url(url, opts = {}) ⇒ Object
- #get_next_url(opts = {}) ⇒ Object
- #get_url_count(state, opts = {}) ⇒ Object
-
#initialize(amqp = nil, mongo = nil, opts = {}) ⇒ SmartPlanner
constructor
A new instance of SmartPlanner.
- #run(opts = {}) ⇒ Object
Constructor Details
#initialize(amqp = nil, mongo = nil, opts = {}) ⇒ SmartPlanner
Returns a new instance of SmartPlanner.
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 |
# File 'lib/apollo_crawler/planner/smart_planner.rb', line 38 def initialize(amqp=nil, mongo=nil, opts={}) self.amqp = amqp self.mongo = mongo # Declarations channel = amqp.create_channel # channel.prefetch(1) self.declarations = Apollo::Agent.declare_entities(channel, opts) # Bindings declarations[:queues]["planner.fetched.queue"].bind(declarations[:exchanges]["planner.fetched"]).subscribe do |delivery_info, , payload| msg = JSON.parse(payload) puts "#{msg.inspect}" if opts[:verbose] puts "REQ: #{msg['request']}" if opts[:verbose] puts "RESP: #{msg['response']}" if opts[:verbose] request = msg['request'] response = msg['response'] doc = Apollo::Model::QueuedUrl.find(request["_id"]) doc.update_attributes(msg['request']) doc.state = :fetched doc.save doc = Apollo::Model::RawDocument.where(:url => request['url']).first if doc if doc.sha_hash != response['sha_hash'] puts "Removing old cached version of '#{request['url']}'" if opts[:verbose] doc.destroy doc = nil else puts "Using cached version of '#{request['url']}'" if opts[:verbose] end else doc = Apollo::Model::RawDocument.where(:sha_hash => response['sha_hash']).first if(doc.nil? == false) puts "Same as #{doc.inspect}" end end if(doc.nil?) doc = Apollo::Model::RawDocument.new(response).save # Publish declarations[:exchanges]["crawler"].publish(msg.to_json, :reply_to => "planner.crawled") end end declarations[:queues]["planner.domained.queue"].bind(declarations[:exchanges]["planner.domained"]).subscribe do |delivery_info, , payload| msg = JSON.parse(payload) puts "DOMAINED !!!" end declarations[:queues]["planner.crawled.queue"].bind(declarations[:exchanges]["planner.crawled"]).subscribe do |delivery_info, , payload| msg = JSON.parse(payload) # puts "Crawled - #{msg.inspect}" request = msg['request'] response = msg['response'] data = msg['data'] links = msg['links'] links = [] if links.nil? data_hash = Digest::SHA256.new.update(data).hexdigest puts "#{data_hash}" links.each do |url| link = url['link'] Apollo::Scheduler::BaseScheduler::schedule(link, request['crawler_name']) end # puts JSON.pretty_generate(data) # puts JSON.pretty_generate(links) end end |
Instance Attribute Details
#amqp ⇒ Object
Returns the value of attribute amqp.
34 35 36 |
# File 'lib/apollo_crawler/planner/smart_planner.rb', line 34 def amqp @amqp end |
#declarations ⇒ Object
Returns the value of attribute declarations.
36 37 38 |
# File 'lib/apollo_crawler/planner/smart_planner.rb', line 36 def declarations @declarations end |
#mongo ⇒ Object
Returns the value of attribute mongo.
35 36 37 |
# File 'lib/apollo_crawler/planner/smart_planner.rb', line 35 def mongo @mongo end |
Instance Method Details
#fetch_queued_urls(opts = {}) ⇒ Object
138 139 140 141 142 143 144 145 146 147 148 149 150 151 |
# File 'lib/apollo_crawler/planner/smart_planner.rb', line 138 def fetch_queued_urls(opts={}) fetching_count = Apollo::Model::QueuedUrl.where({:state => :fetching}).count if(fetching_count > 4) puts "Fetching too many URLs. Slowing down for a while ..." return end while get_url_count(:fetching) < 4 url = get_next_url(opts) puts "SmartPlanner::fetch_queued_urls() - Queueing: #{url.inspect}" fetch_url(url, opts) end end |
#fetch_url(url, opts = {}) ⇒ Object
123 124 125 126 127 128 129 130 |
# File 'lib/apollo_crawler/planner/smart_planner.rb', line 123 def fetch_url(url, opts={}) if(opts[:verbose]) puts "AMQP fetching '#{url.inspect}'" end # Publish declarations[:exchanges]["fetcher"].publish(url.to_json, :reply_to => "planner.fetched") end |
#get_next_url(opts = {}) ⇒ Object
132 133 134 135 136 |
# File 'lib/apollo_crawler/planner/smart_planner.rb', line 132 def get_next_url(opts={}) tmp = Apollo::Model::QueuedUrl.where({:state => :queued}).order_by(:created_at.asc) res = tmp.find_and_modify({ "$set" => { state: :fetching }}, new: true) return res end |
#get_url_count(state, opts = {}) ⇒ Object
119 120 121 |
# File 'lib/apollo_crawler/planner/smart_planner.rb', line 119 def get_url_count(state, opts={}) Apollo::Model::QueuedUrl.where({:state => state}).count end |
#run(opts = {}) ⇒ Object
153 154 155 156 157 158 159 160 161 162 |
# File 'lib/apollo_crawler/planner/smart_planner.rb', line 153 def run(opts={}) request_exit = false while request_exit == false fetch_queued_urls(opts) sleep 1 end return 0 end |