Class: Apollo::Agent::FetcherAgent
- Defined in:
- lib/apollo_crawler/agent/fetcher_agent.rb
Constant Summary collapse
- THREAD_POOL_SIZE =
1
Instance Attribute Summary collapse
-
#declarations ⇒ Object
Returns the value of attribute declarations.
-
#fetcher ⇒ Object
Returns the value of attribute fetcher.
-
#mutex ⇒ Object
Returns the value of attribute mutex.
Instance Method Summary collapse
- #format_response_msg(queued_url, doc) ⇒ Object
-
#initialize(amqp, opts = {}) ⇒ FetcherAgent
constructor
A new instance of FetcherAgent.
- #process_fetched_doc(queued_url, doc, metadata, opts = {}) ⇒ Object
- #send_response_msg(dest, queued_url, doc) ⇒ Object
Methods inherited from BaseAgent
Constructor Details
#initialize(amqp, opts = {}) ⇒ FetcherAgent
Returns a new instance of FetcherAgent.
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/apollo_crawler/agent/fetcher_agent.rb', line 41 def initialize(amqp, opts={}) self.fetcher = Apollo::Fetcher::SmartFetcher.new if(opts[:verbose]) puts "Initializing fetcher agent..." end # Declarations channel = amqp.create_channel channel.prefetch(THREAD_POOL_SIZE) # Binding (Default) self.declarations = Apollo::Agent.declare_entities(channel, opts) queue = declarations[:queues]["fetcher.queue"] # AMQP contexts for threads contexts = [] (0...THREAD_POOL_SIZE).each do |i| puts "FetcherAgent::initialize() - Creating context #{i}" if opts[:verbose] end # AMQP contexts mutex/lock self.mutex = Mutex.new() exchange = self.declarations[:exchanges]["fetcher"] queue.bind(exchange).subscribe(:ack => true) do |delivery_info, , payload| # There can be troubles with concurency, please see https://groups.google.com/forum/?fromgroups=#!topic/ruby-amqp/aO9GPu-jxuE queued_url = JSON.parse(payload) url = queued_url["url"] puts "FetcherAgent: Received - '#{url}', metadata #{.inspect}" if opts[:verbose] self.mutex.synchronize { puts "FetcherAgent: Acking - '#{delivery_info.delivery_tag}'" if opts[:verbose] channel.basic_ack(delivery_info.delivery_tag, true) } begin doc = Apollo::Fetcher::SmartFetcher::fetch(url) doc = process_fetched_doc(queued_url, doc, , opts) if( && [:reply_to]) puts "Replying to '#{[:reply_to]}'" send_response_msg([:reply_to], queued_url, doc) end rescue Exception => e puts "EXCEPTION: FetcherAgent::initialize() - Unable to fetch '#{url}', reason: '#{e.to_s}'" end doc end end |
Instance Attribute Details
#declarations ⇒ Object
Returns the value of attribute declarations.
38 39 40 |
# File 'lib/apollo_crawler/agent/fetcher_agent.rb', line 38 def declarations @declarations end |
#fetcher ⇒ Object
Returns the value of attribute fetcher.
37 38 39 |
# File 'lib/apollo_crawler/agent/fetcher_agent.rb', line 37 def fetcher @fetcher end |
#mutex ⇒ Object
Returns the value of attribute mutex.
39 40 41 |
# File 'lib/apollo_crawler/agent/fetcher_agent.rb', line 39 def mutex @mutex end |
Instance Method Details
#format_response_msg(queued_url, doc) ⇒ Object
108 109 110 111 112 113 |
# File 'lib/apollo_crawler/agent/fetcher_agent.rb', line 108 def format_response_msg(queued_url, doc) return { :request => queued_url, :response => doc } end |
#process_fetched_doc(queued_url, doc, metadata, opts = {}) ⇒ Object
95 96 97 98 99 100 101 102 103 104 105 106 |
# File 'lib/apollo_crawler/agent/fetcher_agent.rb', line 95 def process_fetched_doc(queued_url, doc, , opts={}) url = queued_url["url"] res = Apollo::Model::RawDocument.new res.headers = doc[:headers] res.body = doc[:body] res.sha_hash = Digest::SHA1.hexdigest(doc[:body]) res.status = doc[:status] res.url = url return res end |
#send_response_msg(dest, queued_url, doc) ⇒ Object
115 116 117 118 119 120 121 122 123 124 |
# File 'lib/apollo_crawler/agent/fetcher_agent.rb', line 115 def send_response_msg(dest, queued_url, doc) if(dest != nil) msg = format_response_msg(queued_url, doc) self.mutex.synchronize { exchange = self.declarations[:exchanges][dest] exchange.publish(msg.to_json) } end end |