Class: RDF::SAK::URLRunner

Inherits:
Object
  • Object
show all
Defined in:
lib/rdf/sak/urlrunner.rb

Instance Method Summary collapse

Constructor Details

#initialize(store: nil, repo: nil, ua: nil, ignore: nil, traverse: nil) ⇒ URLRunner

NS =

rdf:  RDF::RDFV,
rdfs: RDF::RDFS,
owl:  RDF::OWL,
xsd:  RDF::XSD,
xhv:  RDF::Vocab::XHV,
http: RDF::Vocabulary.new('http://www.w3.org/2011/http#'),
vann: RDF::Vocabulary.new('http://purl.org/vocab/vann/'),
skos: RDF::Vocab::SKOS,
dcat: RDF::Vocab::DCAT,
so:   RDF::Vocab::SCHEMA,
dct:  RDF::Vocab::DC,
ci:   RDF::SAK::CI,
ogp:  RDF::Vocab::OG,
foaf: RDF::Vocab::FOAF,
org:  RDF::Vocab::ORG,
bibo: RDF::Vocab::BIBO,
qb:   RDF::Vocabulary.new('http://purl.org/linked-data/cube#'),

.freeze



235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
# File 'lib/rdf/sak/urlrunner.rb', line 235

def initialize store: nil, repo: nil, ua: nil, ignore: nil, traverse: nil
  @store  = store
  @repo   = repo
  # @urls   = Concurrent::Array.new
  @jobs   = Concurrent::Array.new
  @seen   = Concurrent::Map.new

  # @fchan  = Concurrent::Promises::Channel.new 10
  # @schan  = Concurrent::Promises::Channel.new 10
  # @tchan  = Concurrent::Promises::Channel.new 10

  @fthrot = Concurrent::Throttle.new 5

  # @done   = Concurrent::Cancellation.new

  @ua     = ua ? ua.to_s : UA
  @ignore = ignore ? ignore.respond_to?(:to_a) ? ignore.to_a : [ignore] : []
  @traverse = traverse ?
    traverse.respond_to?(:to_a) ? traverse.to_a : [traverse] : []
  # other stuff

  #Signal.trap(:INT) { warn 'FART'; @done.origin.resolve }
end

Instance Method Details

#enqueue(uri) ⇒ Object



469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
# File 'lib/rdf/sak/urlrunner.rb', line 469

def enqueue uri
  uri = URI(uri_pp uri.to_s).normalize

  return if seen?    uri
  return if ignored? uri

  warn "enqueuing #{uri}"

  @fthrot.future(uri) do |u|
    fr = fetch u
    fr.on_rejection { |reason| warn "fetch fail: #{reason.inspect}" }
    fs = fr.then do |resp|
      begin
        store resp
      rescue Exception => e
        warn e
      end
    end
    fs.on_rejection { |reason| warn "store fail: #{reason.inspect}" }
    ft = fs.then do |obj|
      if obj and /html/i.match? obj.type
        warn "tidying #{obj[:"sha-256"]}"
        tidy obj
      else
        obj
      end
    end
    ft.on_rejection { |reason| warn "tidy fail: #{reason.inspect}" }
    ft.then { |obj| sponge obj }
  end.on_rejection { |reason| warn "throttle fail: #{reason.inspect}" }
end

#fetch(url, redir = 10) ⇒ Object



259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
# File 'lib/rdf/sak/urlrunner.rb', line 259

def fetch url, redir = 10
  # `return` in a lambda is okay but in a block you have to use
  # `break`, and it complains if you do that

  url = URI(url.to_s) unless url.is_a? URI
  url.normalize!

  # XXX apparently you can't just *return* a fulfilled future, you
  # have to assign it to something.
  bailout = Concurrent::Promises.fulfilled_future nil
  return bailout unless %w[http https].include? url.scheme

  # nuke the fragment
  url.fragment = nil

  # make sure we don't do this a second time
  return bailout if seen? url
  @seen[url.to_s] = url

  ru = RDF::URI(url.to_s)

  # obtain last-modified from object
  q = RDF::Query.new { pattern [ru, RDF::Vocab::DC.modified, :m] }

  ims = q.execute(@repo).map do |s|
    s[:m].object.to_time.getgm if
      s[:m].literal? and s[:m].object.respond_to? :to_time
  end.compact.sort { |a, b| b <=> a }.first

  # XXX this is a little too low-level, don't you think?
  http = Net::HTTP.new(url.hostname, url.port)
  http.continue_timeout = 10
  http.open_timeout     = 30
  http.read_timeout     = 10
  http.write_timeout    = 10
  http.use_ssl          = url.is_a?(URI::HTTPS)
  http.start

  hdr  = { 'User-Agent' => @ua, 'Connection' => 'close' }
  hdr['If-Modified-Since'] = ims.rfc2822 if ims
  req  = Net::HTTP::Get.new url, hdr
  resp = http.request req
  http.finish

  case resp
  when Net::HTTPSuccess
    Concurrent::Promises.fulfilled_future(resp)
  when Net::HTTPNotModified
    warn "Already seen #{url}"
    bailout
  when Net::HTTPRedirection
    raise Net::HTTPClientException.new "Too many redirects (#{redir})",
      resp if redir <= 0
    unless dest = resp['location']
      raise Net::HTTPBadResponse.new(
        "Redirect on #{url} missing Location header", resp)
    end

    dest = (url + dest).normalize

    @repo << [ru, RDF::SAK::CI.canonical, RDF::URI(dest.to_s)]

    raise Net::HTTPClientException.new "Loop detected on #{url}",
      resp if url == dest

    fetch(dest, redir - 1)
  else
    raise Net::HTTPClientException.new "Response failed #{url}", resp
  end
end

#ignored?(uri) ⇒ Boolean

Returns:

  • (Boolean)


512
513
514
515
516
517
# File 'lib/rdf/sak/urlrunner.rb', line 512

def ignored? uri
  return if @ignore.empty?

  re = /(?:^|\.)#{@ignore.map { |t| Regexp.escape t }.join ?|}$/o
  re.match? uri.host
end

#run(urls, shuffle: false) ⇒ Object



519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
# File 'lib/rdf/sak/urlrunner.rb', line 519

def run urls, shuffle: false
  # ingest URLs and prune 
  urls = urls.map { |u| URI(uri_pp u).normalize }.reject { |u| ignored? u }

  # optionally randomize
  urls.shuffle! if shuffle

  # now add the queue
  urls.map { |url| enqueue url }.map(&:wait)
  
  # while job = @jobs.shift
  #   job.wait
  # end

  self
end

#seen?(uri) ⇒ Boolean

Returns:

  • (Boolean)


501
502
503
# File 'lib/rdf/sak/urlrunner.rb', line 501

def seen? uri
  !!@seen[uri.to_s]
end

#sponge(obj) ⇒ Object



431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
# File 'lib/rdf/sak/urlrunner.rb', line 431

def sponge obj
  return unless obj
  #if obj and /xml/.match? obj.type

  # get rdf stuff
  ru  = RDF::URI(obj[:"sha-256"].to_s)
  uri = RDF::Query.new do
    pattern [:b, RDF::SAK::TFO.output, ru]
    pattern [:b, RDF::SAK::TFO.transform, TIDY_U]
    pattern [:b, RDF::SAK::TFO.input, :a]
    pattern [:s, RDF::Vocab::DC.hasVersion, :a]
  end.execute(@repo) + RDF::Query.new do
    pattern [:s, RDF::Vocab::DC.hasVersion, ru]
  end.execute(@repo).uniq.map do |sol|
    u = sol[:s]
    u if u.uri? and u.scheme and %w[http https].include? u.scheme.downcase
  end.compact.first

  uuri    = URI(uri ? uri_pp(uri.to_s) : ru)
  content = obj.content
  sponge  = SPONGE[obj.type]

  if /xml/.match? type
    content = Nokogiri.XML(content, uuri.to_s)
    unless sponge
      type   = xml_type(content)
      sponge = SPONGE[type] || SPONGE['image/svg+xml'] # svg is just xlink
    end
  end

  if sponge
    instance_exec(content, uuri, &sponge).compact.uniq.each do |link|
      enqueue link if traverse? link
    end
  end

end

#store(resp) ⇒ Object



330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
# File 'lib/rdf/sak/urlrunner.rb', line 330

def store resp
  return unless resp

  now     = Time.now.getgm # we mint a single "now" to use everywhere
  date    = if d = resp['Date']
              # who knows what weird shit is coming off the wire
              d.gsub!(/^([^,]*(?:,[^,]*)?)(?:\s*,.*)?$/, "\\1")
              Time.httpdate(d).getgm rescue now
            else
              now # rfc says server MUST send a date header, but does it?
            end
  mtime   = if lm = resp['Last-Modified']
              # lol god two Last-Modified headers no that's not messed up
              lm.gsub!(/^([^,]*(?:,[^,]*)?)(?:\s*,.*)?$/, "\\1")
              delta = now - date
                    
              Time.httpdate(lm).getgm + delta rescue nil
            end
  lang    = if resp['Content-Language']
              # same friggin deal
              resp['Content-Language'].strip.split('\s*,+\s*').first
            end
  charset = nil
  if type = resp['Content-Type']
    # and again, wtf
    type = type.split(/\s*,+\s*/).first || 'application/octet-stream'
    type, *params = type.strip.split(/\s*;\s*/).reject(&:empty?)
    params = params.map do |p|
      p.split(/\s*=\s*/, 2)
    end.reject { |p| p.length < 2 }.to_h.transform_keys(&:downcase)

    charset = params['charset'] if TOKEN.match? params['charset']
  end

  
  # obj = @store.add resp.body, strict: false,
  #   type: type, charset: charset, language: lang, mtime: mtime

  s  = RDF::URI(resp.uri.to_s) # - the subject
  cs = RDF::Changeset.new      # - a receptacle for statements

  cs << [s, RDF::Vocab::DC.modified, obj.mtime.getgm]
  #cs << [s, RDF::Vocab::DC.hasVersion, RDF::URI(obj[:"sha-256"].to_s)]

  cs.apply @repo

  obj
  #nil
end

#tidy(obj) ⇒ Object



380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
# File 'lib/rdf/sak/urlrunner.rb', line 380

def tidy obj
  if obj and /html/i.match? obj.type
    # obtain original digest uri
    oldu = RDF::URI(obj[:"sha-256"].to_s)

    # first let's detect if this job has already been done
    RDF::Query.new do
      pattern [:a, RDF::SAK::TFO.input, oldu]
      pattern [:a, RDF::SAK::TFO.transform, TIDY_U]
      pattern [:a, RDF::SAK::TFO.output, :n]
    end.execute(@repo).map do |s|
      # can't completely trust what's in the repo so check then convert
      URI(s[:n].to_s) if s[:n].uri? and s[:n].scheme.downcase == 'ni'
    end.compact.each do |n|
      # just because it's in the rdf doesn't mean it's in the store
      out = @store.get n
      return out if out and !out.deleted? # don't return an empty record
    end

    # tidy the object and reinsert it back into the store as xhtml
    start = Time.now.getgm
    if clean = TidyFFI::Tidy.clean(obj.content.read, TIDY_OPTS)
      newobj = @store.add clean, mtime: obj.mtime,
        type: 'application/xhtml+xml', language: obj.language,
        charset: obj.charset, encoding: obj.encoding
      stop = Time.now.getgm
      newu = RDF::URI(newobj[:"sha-256"].to_s)

      q = RDF::Query.new do
        pattern [:a, RDF::SAK::TFO.input, oldu]
        pattern [:a, RDF::SAK::TFO.output, newu]
      end

      if q.execute(@repo).empty?
        s  = RDF::URI(UUIDTools::UUID.random_create.to_uri)
        cs = RDF::Changeset.new
        cs << [s, RDF.type, RDF::SAK::TFO.Application]
        cs << [s, RDF::Vocab::PROV.startedAtTime, start]
        cs << [s, RDF::Vocab::PROV.endedAtTime, stop]
        cs << [s, RDF::SAK::TFO.transform, TIDY_U]
        cs << [s, RDF::SAK::TFO.input, oldu]
        cs << [s, RDF::SAK::TFO.output, newu]

        cs.apply @repo
      end

      newobj
    end
  end      
end

#traverse?(uri) ⇒ Boolean

Returns:

  • (Boolean)


505
506
507
508
509
510
# File 'lib/rdf/sak/urlrunner.rb', line 505

def traverse? uri
  return if @traverse.empty?

  re = /(?:^|\.)#{@traverse.map { |t| Regexp.escape t }.join ?|}$/o
  re.match? uri.host
end