Class: Iudex::DA::WorkPoller

Inherits:
Object
  • Object
show all
Includes:
Gravitext::HTMap, Filter::KeyHelper
Defined in:
lib/iudex-da/work_poller.rb

Overview

A SQL based WorkPoller

Constant Summary collapse

URL64_ORDER =

URL 64 lexicon, ASCII or “C” LOCALE ordered

"-0123456789ABCDEFGHIJKLMNOPQRSTU" +
"VWXYZ_abcdefghijklmnopqrstuvwxyz"

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Filter::KeyHelper

lookup_key

Constructor Details

#initialize(data_source, mapper) ⇒ WorkPoller

Returns a new instance of WorkPoller.



186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
# File 'lib/iudex-da/work_poller.rb', line 186

def initialize( data_source, mapper )
  super()

  @domain_depth_coef  = nil
  @do_domain_group    = false
  @do_reserve         = false
  @do_discard         = true
  @instance           = nil

  @max_priority_urls  =    nil
  @max_domain_urls    = 10_000
  @max_urls           = 50_000
  @max_discard_ratio  = 2.0/3.0
  @max_reserved_time_s = nil
  @last_none_reserved  = Time.now
  @max_retries        = 10
  @isolation_level    = 4
  @backoff            = 0.0

  @age_coef_1         = 0.2
  @age_coef_2         = 0.1

  @domain_union       = []

  @uhash_slice        = nil

  @log = RJack::SLF4J[ self.class ]

  keys( :url, :priority, :next_visit_after ).each do |k|
    unless mapper.fields.include?( k )
      raise "WorkPoller needs mapper with #{key.name} included."
    end
  end

  @mapper = mapper
  @data_source = data_source
end

Instance Attribute Details

#age_coef_1Object

First age coefficient. If set > 0.0, adjust priority by the equation:

priority + age_coef_1 * sqrt( age_coef_2 * age )

Where age is now - next_visit_after the (default: 0.2)



127
128
129
# File 'lib/iudex-da/work_poller.rb', line 127

def age_coef_1
  @age_coef_1
end

#age_coef_2Object

Second age coefficient (default: 0.1)



130
131
132
# File 'lib/iudex-da/work_poller.rb', line 130

def age_coef_2
  @age_coef_2
end

#backoffObject

Maximum first-retry back-off delay in seconds (Float). Actual delay is a random value from 0..(value * 2^tries) for each retry. Default: 0.0 : no delay



184
185
186
# File 'lib/iudex-da/work_poller.rb', line 184

def backoff
  @backoff
end

#do_discard=(value) ⇒ Object (writeonly)

If set true, discards old queue at every poll, even if do_reserve could make queue re-fill a safe operation. (Default: true)



81
82
83
# File 'lib/iudex-da/work_poller.rb', line 81

def do_discard=(value)
  @do_discard = value
end

#do_domain_group=(value) ⇒ Object (writeonly)

If set true, provide the final work list ordered in domain, priority order (default: false)



72
73
74
# File 'lib/iudex-da/work_poller.rb', line 72

def do_domain_group=(value)
  @do_domain_group = value
end

#do_reserve=(value) ⇒ Object (writeonly)

If set true, UPDATE reserved date (and instance, if specified) (Default: false)



76
77
78
# File 'lib/iudex-da/work_poller.rb', line 76

def do_reserve=(value)
  @do_reserve = value
end

#domain_depth_coefObject

If set > 0.0 group by domain and reduce priority for subsequent urls within a common (registration level) domain (coefficient of depth). This increases crawl throughput when many domains are available. (default: nil, off)



34
35
36
# File 'lib/iudex-da/work_poller.rb', line 34

def domain_depth_coef
  @domain_depth_coef
end

#domain_unionObject

A table of option rows as defined below. A nil/unspecified domain and type row applies to all domains/types not covered by another row. Without such a row, work is limited to the explicit domains/types listed.

Options

:domain

The registration-level, normalized lower-case domain value.

:type

An (upper-case) TYPE value to be AND’d with a domain domain or may appear on its own, applying to all unconfigured domains.

:max

The maximum number of visit urls to obtain in one poll (instead of the top level #max_urls.) A zero value efficiently excludes this domain/type.

Also a [ domain, max ] alternative syntax is currently supported but deprecated.



158
159
160
# File 'lib/iudex-da/work_poller.rb', line 158

def domain_union
  @domain_union
end

#instanceObject

String uniquely identifying this worker instance. Only used here with do_reserve.



119
120
121
# File 'lib/iudex-da/work_poller.rb', line 119

def instance
  @instance
end

#isolation_levelObject

The transaction isolation level to use for work polling See java.sql.Connection constants (Default: TRANSACTION_REPEATABLE_READ (4))



173
174
175
# File 'lib/iudex-da/work_poller.rb', line 173

def isolation_level
  @isolation_level
end

#lock_modeObject

URLS table explicit lock mode (String) for work polling, i.e. ‘EXCLUSIVE’. Try this if deadlocks/contention are otherwise causing work poll starvation or excessive retries. (Default: nil -> implicit locking via isolation level only)



179
180
181
# File 'lib/iudex-da/work_poller.rb', line 179

def lock_mode
  @lock_mode
end

#max_discard_ratioObject

The maximum ratio of current to max_urls where the old queue will be discarded as a safety to avoid starvation (Default: 0.667)



85
86
87
# File 'lib/iudex-da/work_poller.rb', line 85

def max_discard_ratio
  @max_discard_ratio
end

#max_domain_urlsObject Also known as: max_host_urls

If #domain_depth_coef is set, this sets maximum urls for any single (registration level) domain (default: 10_000)



52
53
54
# File 'lib/iudex-da/work_poller.rb', line 52

def max_domain_urls
  @max_domain_urls
end

#max_priority_urlsObject

A secondary limit on the number of urls to consider, taking the N high basic priority urls. This is only ever applied when #domain_depth_coef is set. (default: nil, off)



68
69
70
# File 'lib/iudex-da/work_poller.rb', line 68

def max_priority_urls
  @max_priority_urls
end

#max_reserved_time_sObject (readonly)

Returns the value of attribute max_reserved_time_s.



103
104
105
# File 'lib/iudex-da/work_poller.rb', line 103

def max_reserved_time_s
  @max_reserved_time_s
end

#max_retriesObject

Max-retries not including the initial try (Default: 10)



168
169
170
# File 'lib/iudex-da/work_poller.rb', line 168

def max_retries
  @max_retries
end

#max_urlsObject

The limit of urls to obtain in a single poll (across all domains). May be overridden by #domain_union= (default: 50_000)



63
64
65
# File 'lib/iudex-da/work_poller.rb', line 63

def max_urls
  @max_urls
end

#uhash_sliceObject

An array containing a zero-based position and a total number of evenly divided segments within the range of possible uhash values. If set only work with uhashes in the designated range will be polled. Note that the uhash is independent of domain, being a hash on the entire URL. (default: nil, off)



165
166
167
# File 'lib/iudex-da/work_poller.rb', line 165

def uhash_slice
  @uhash_slice
end

Instance Method Details

#aged_priority?Boolean

Returns:

  • (Boolean)


132
133
134
135
# File 'lib/iudex-da/work_poller.rb', line 132

def aged_priority?
  ( age_coef_1 && age_coef_1 > 0.0 &&
    age_coef_2 && age_coef_2 > 0.0 )
end

#and_list(l) ⇒ Object



500
501
502
# File 'lib/iudex-da/work_poller.rb', line 500

def and_list( l )
  l.compact.join( " AND " )
end

#clist(l) ⇒ Object



496
497
498
# File 'lib/iudex-da/work_poller.rb', line 496

def clist( l )
  l.compact.join( ', ' )
end

#discard(visit_queue) ⇒ Object

Override GenericWorkPollStrategy to discard old VisitQueue contents when do_reserve is enabled.



259
260
261
262
263
264
265
266
267
268
269
270
271
# File 'lib/iudex-da/work_poller.rb', line 259

def discard( visit_queue )
  if reserve? && visit_queue.order_count > 0
    orders = visit_queue.hosts.inject( [] ) do |a, hq|
      a.concat( hq.orders.to_a )
    end
    if orders.length > 0
      n = reader.unreserve( orders )
      @log.info { "Unreserved #{n} orders on discard" }
    end
  end
rescue SQLException => x
  @log.error( "On discard: ", x )
end

#discard?Boolean

Returns:

  • (Boolean)


113
114
115
# File 'lib/iudex-da/work_poller.rb', line 113

def discard?
  @do_discard
end

#domain_depth?Boolean

Returns:

  • (Boolean)


36
37
38
# File 'lib/iudex-da/work_poller.rb', line 36

def domain_depth?
  domain_depth_coef && domain_depth_coef > 0.0
end

#domain_group?Boolean

Returns:

  • (Boolean)


105
106
107
# File 'lib/iudex-da/work_poller.rb', line 105

def domain_group?
  @do_domain_group
end

#domain_union?Boolean

Returns:

  • (Boolean)


313
314
315
# File 'lib/iudex-da/work_poller.rb', line 313

def domain_union?
  !@domain_union.empty?
end

#fields(*ksyms) ⇒ Object



491
492
493
494
# File 'lib/iudex-da/work_poller.rb', line 491

def fields( *ksyms )
  ( @mapper.fields.map { |k| k.name.to_sym } |
    ksyms.flatten.compact.map { |s| s.to_sym } )
end

#filter_query(flds, max, criteria) ⇒ Object



415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
# File 'lib/iudex-da/work_poller.rb', line 415

def filter_query( flds, max, criteria )

  if aged_priority?
    flds = flds.dup
    i = flds.index( :priority ) || flds.size
    flds[ i ] = <<-SQL.strip
      ( priority +
        #{age_coef_1}::REAL *
              SQRT( #{age_coef_2}::REAL *
                    EXTRACT( EPOCH FROM ( now() - next_visit_after ) ) )::REAL
      ) AS priority
    SQL
  end

  sql = <<-SQL
    SELECT #{clist flds}
    FROM urls
    WHERE #{and_list criteria}
  SQL

  sql += <<-SQL if max
    ORDER BY priority DESC
    LIMIT #{max}
  SQL

  sql
end

#generate_query(current_urls) ⇒ Object



317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
# File 'lib/iudex-da/work_poller.rb', line 317

def generate_query( current_urls )
  criteria = [ "next_visit_after <= now()" ]

  criteria << "reserved IS NULL" if reserve?

  if uhash_slice
    min, max = url64_range( *uhash_slice )
    criteria << "uhash > ( '#{min}' COLLATE \"C\" )" if min
    criteria << "uhash < ( '#{max}' COLLATE \"C\" )" if max
  end

  unless domain_union?
    query = generate_query_inner( criteria, ( max_urls - current_urls ) )
  else
    subqueries = []
    @domain_union.each do | opts |
      opts = opts.dup
      opts[ :max ] = ( opts[ :max ] * ( max_urls - current_urls ) /
                       max_urls.to_f ).floor
      next if opts[ :max ] == 0

      c = criteria.dup
      if opts[ :domain ].nil?
        c += @domain_union.map { |r| r[ :domain ] }.
                           compact.
                           uniq.
                           map { |nd| "domain != '#{nd}'" }
      else
        c << "domain = '#{opts[ :domain ]}'"
      end

      if opts[ :type ].nil?
        c += @domain_union.select { |r| r[ :domain ] == opts[ :domain ] }.
                           map { |r| r[ :type ] }.
                           compact.
                           uniq.
                           map { |nt| "type != '#{nt}'" }
      elsif opts[ :type ]
        c << "type = '#{opts[ :type ]}'"
      end

      subqueries << generate_query_inner( c, opts[ :max ] )
    end
    if subqueries.size == 1
      query = subqueries.first
    else
      query = "(" + subqueries.join( ") UNION ALL (" ) + ")"
    end
  end

  query = wrap_with_update( fields, query ) if reserve?

  query = wrap_domain_group_query( fields, query ) if domain_group?

  query = query.gsub( /\s+/, ' ').strip

  query
end

#generate_query_inner(criteria, max_urls) ⇒ Object



376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
# File 'lib/iudex-da/work_poller.rb', line 376

def generate_query_inner( criteria, max_urls )

  query = filter_query(
        fields( ( :domain if domain_depth? || domain_group? ),
                ( :uhash if reserve? ) ),
        ( max_priority_urls if domain_depth? ),
        criteria )

  if domain_depth?
    flds = fields( ( :domain if domain_group? ) )
    query = wrap_domain_partition_query( flds, query )
  end

  limit_priority = domain_depth? ? :adj_priority : :priority
  query += <<-SQL
    ORDER BY #{limit_priority} DESC
    LIMIT #{max_urls}
  SQL

  query
end

#host_depth_divisorObject

Deprecated, use #domain_depth_coef (the reciprocal)



41
42
43
# File 'lib/iudex-da/work_poller.rb', line 41

def host_depth_divisor
  1.0 / domain_depth_coef
end

#host_depth_divisor=(dv) ⇒ Object

Deprecated, use #domain_depth_coef= (reciprocal)



46
47
48
# File 'lib/iudex-da/work_poller.rb', line 46

def host_depth_divisor=( dv )
  @domain_depth_coef = 1.0 / dv
end

#instance_unreserveObject

Unreserve any orders that are reserved by the current instance. No-op unless do_reserve and instance are set.



275
276
277
278
279
280
281
282
283
284
285
286
287
288
# File 'lib/iudex-da/work_poller.rb', line 275

def instance_unreserve
  if reserve? && instance
    n = reader.update( <<-SQL )
      UPDATE urls
      SET reserved = NULL
      WHERE reserved IS NOT NULL AND
            instance = '#{instance}'
    SQL
    @log.info { "Unreserved #{n} orders for instance #{instance}" }
    n
  end
rescue SQLException => x
  @log.error( "On instance_unreserve: ", x )
end

#logObject

Override GenericWorkPollStrategy



225
226
227
# File 'lib/iudex-da/work_poller.rb', line 225

def log
  @log.java_logger
end

#max_reserved_timeObject

The maximum amount of time in milliseconds that the oldest order can remain reserved before a discard is required. This is only relevant when do_reserve is true and do_discard is set false, and typically would be set as a multiple of max_poll_interval (ms). Note that max_poll_interval is interpreted as the worst case next discard opportunity for this purpose. The next poll made to an empty queue, either by prior discard or completion, resets the time tracking. (Default: nil, off)



95
96
97
# File 'lib/iudex-da/work_poller.rb', line 95

def max_reserved_time
  @max_reserved_time_s && ( @max_reserved_time_s * 1000.0 ).round
end

#max_reserved_time=(ms) ⇒ Object



99
100
101
# File 'lib/iudex-da/work_poller.rb', line 99

def max_reserved_time=( ms )
  @max_reserved_time_s = ms / 1000.0
end

#next_reserve_time(now = Time.now) ⇒ Object



253
254
255
# File 'lib/iudex-da/work_poller.rb', line 253

def next_reserve_time( now = Time.now )
  now - @last_none_reserved + ( max_poll_interval / 1000.0 )
end

#poll(current_urls = 0) ⇒ Object

Poll work and return as List<UniMap> Raises SQLException



239
240
241
242
243
244
# File 'lib/iudex-da/work_poller.rb', line 239

def poll( current_urls = 0 )
  @last_none_reserved = Time.now if max_reserved_time_s && current_urls == 0
  query = generate_query( current_urls )
  @log.debug { "Poll query: #{query}" }
  reader.select_with_retry( query )
end

#pollWorkImpl(visit_queue) ⇒ Object

Override GenericWorkPollStrategy



230
231
232
233
234
235
# File 'lib/iudex-da/work_poller.rb', line 230

def pollWorkImpl( visit_queue )
  res = poll( visit_queue.order_count )
  visit_queue.add_all( res ) if res
rescue SQLException => x
  @log.error( "On poll: ", x )
end

#readerObject



290
291
292
293
294
295
296
297
298
# File 'lib/iudex-da/work_poller.rb', line 290

def reader
  @reader ||= ContentReader.new( @data_source, @mapper ).tap do |r|
    r.priority_adjusted = aged_priority?
    r.max_retries = max_retries
    r.isolation_level = isolation_level
    r.lock_mode = lock_mode if lock_mode
    r.backoff = ( backoff * 1_000 ).round if backoff > 0.0
  end
end

#reserve?Boolean

Returns:

  • (Boolean)


109
110
111
# File 'lib/iudex-da/work_poller.rb', line 109

def reserve?
  @do_reserve
end

#shouldReplaceQueue(visit_queue) ⇒ Object

Override GenericWorkPollStrategy



247
248
249
250
251
# File 'lib/iudex-da/work_poller.rb', line 247

def shouldReplaceQueue( visit_queue )
  ( !reserve? || discard? ||
    ( ( visit_queue.order_count.to_f / max_urls ) > max_discard_ratio ) ||
    ( max_reserved_time_s && next_reserve_time > max_reserved_time_s ) )
end

#url64_range(pos, segments) ⇒ Object

Given a zero-based position within some number of segments, returns [ min, max ] bounds where min will be nil at pos=0, and max will be nil at pos=segments-1. Non nil values are uhash prefixes that can be used as selection criteria.



477
478
479
480
481
482
483
484
485
486
487
488
489
# File 'lib/iudex-da/work_poller.rb', line 477

def url64_range( pos, segments )
  unless pos >= 0 && segments > pos
    raise "Invalid url64_range: 0 <= #{pos} < #{segments}"
  end

  period = ( 64 * 64 / segments.to_f )
  low  = ( period *  pos    ).round if  pos > 0
  high = ( period * (pos+1) ).round if (pos+1) < segments

  [ low, high ].map do |i|
    URL64_ORDER[ i / 64 ].chr + URL64_ORDER[ i % 64 ].chr if i
  end
end

#wrap_domain_group_query(flds, sub) ⇒ Object



461
462
463
464
465
466
467
# File 'lib/iudex-da/work_poller.rb', line 461

def wrap_domain_group_query( flds, sub )
  <<-SQL
    SELECT #{clist flds}
    FROM ( #{sub} ) AS subDG
    ORDER BY domain, priority DESC
  SQL
end

#wrap_domain_partition_query(flds, sub) ⇒ Object



398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
# File 'lib/iudex-da/work_poller.rb', line 398

def wrap_domain_partition_query( flds, sub )
  <<-SQL
    SELECT #{clist flds}
    FROM ( SELECT #{clist flds},
           ( priority - ( #{domain_depth_coef}::REAL * ( dpos - 1 ) )
           )::REAL AS adj_priority
           FROM ( SELECT #{clist flds},
                         row_number() OVER (
                           PARTITION BY domain
                           ORDER BY priority DESC ) AS dpos
                  FROM ( #{ sub } ) AS subP
                ) AS subH
           WHERE dpos <= #{max_domain_urls}
         ) AS subA
  SQL
end

#wrap_with_update(flds, sub) ⇒ Object



443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
# File 'lib/iudex-da/work_poller.rb', line 443

def wrap_with_update( flds, sub )
  sflds = [ "reserved = now()" ]
  sflds << "instance = '#{instance}'" if instance

  # Use ..FOR UPDATE unless not supported by query specific
  # options with PostgreSQL <= 9.1
  sub += " FOR UPDATE" unless domain_depth? || domain_union?

  <<-SQL
    WITH work AS ( #{sub} ),
    reserve AS (
      UPDATE urls
      SET #{clist sflds}
      WHERE uhash IN ( SELECT uhash FROM work ) )
    SELECT #{clist flds} FROM work
  SQL
end