Class: Traject::Indexer

Inherits:
Object
  • Object
show all
Extended by:
QualifiedConstGet
Includes:
Macros::Basic, Macros::Transformation, QualifiedConstGet
Defined in:
lib/traject/indexer.rb,
lib/traject/indexer/step.rb,
lib/traject/indexer/context.rb,
lib/traject/indexer/settings.rb,
lib/traject/indexer/marc_indexer.rb,
lib/traject/indexer/nokogiri_indexer.rb

Overview

Represents the context of a specific record being indexed, passed to indexing logic blocks

Arg source_record_id_proc is a lambda that takes one arg (indexer-specific source record), and returns an ID for it suitable for use in log messages.

Direct Known Subclasses

MarcIndexer, NokogiriIndexer

Defined Under Namespace

Classes: AfterProcessingStep, ConfigLoadError, Context, EachRecordStep, MarcIndexer, NokogiriIndexer, Settings, ToFieldStep

Constant Summary collapse

CompletedStateError =
Class.new(StandardError)
ArityError =
Class.new(ArgumentError)
NamingError =
Class.new(ArgumentError)

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from QualifiedConstGet

qualified_const_get

Methods included from Macros::Transformation

#append, #default, #delete_if, #first_only, #gsub, #prepend, #select, #split, #strip, #transform, #translation_map, #unique

Methods included from Macros::Basic

#literal

Constructor Details

#initialize(arg_settings = {}, &block) ⇒ Indexer

optional hash or Traject::Indexer::Settings object of settings. optionally takes a block which is instance_eval'd in the indexer, intended for configuration simimlar to what would be in a config file.



176
177
178
179
180
181
182
183
184
185
# File 'lib/traject/indexer.rb', line 176

def initialize(arg_settings = {}, &block)
  @writer_class           = nil
  @completed              = false
  @settings               = Settings.new(arg_settings).with_defaults(self.class.default_settings)
  @index_steps            = []
  @after_processing_steps = []

  self.class.apply_class_configure_block(self)
  instance_eval(&block) if block
end

Instance Attribute Details

#loggerObject



363
364
365
# File 'lib/traject/indexer.rb', line 363

def logger
  @logger ||= create_logger
end

#reader_classObject



752
753
754
755
756
757
758
759
# File 'lib/traject/indexer.rb', line 752

def reader_class
  unless defined? @reader_class
    reader_class_name = settings["reader_class_name"]

    @reader_class = qualified_const_get(reader_class_name)
  end
  return @reader_class
end

#writerObject



777
778
779
# File 'lib/traject/indexer.rb', line 777

def writer
  @writer ||= settings["writer"] || writer!
end

#writer_classObject



761
762
763
# File 'lib/traject/indexer.rb', line 761

def writer_class
  writer.class
end

Class Method Details

.apply_class_configure_block(instance) ⇒ Object



212
213
214
215
216
217
218
219
220
221
222
# File 'lib/traject/indexer.rb', line 212

def self.apply_class_configure_block(instance)
  # Make sure we inherit from superclass that has a class-level ivar @class_configure_block
  if self.superclass.respond_to?(:apply_class_configure_block)
    self.superclass.apply_class_configure_block(instance)
  end
  if @class_configure_blocks && !@class_configure_blocks.empty?
    @class_configure_blocks.each do |block|
      instance.configure(&block)
    end
  end
end

.configure(&block) ⇒ Object

Class level configure block(s) accepted too, and applied at instantiation before instance-level configuration.

EXPERIMENTAL, implementation may change in ways that effect some uses. https://github.com/traject/traject/pull/213

Note that settings set by 'provide' in subclass can not really be overridden by 'provide' in a next level subclass. Use self.default_settings instead, with call to super.

You can call this .configure multiple times, blocks are added to a list, and will be used to initialize an instance in order.

The main downside of this workaround implementation is performance, even though defined at load-time on class level, blocks are all executed on every instantiation.



208
209
210
# File 'lib/traject/indexer.rb', line 208

def self.configure(&block)
  (@class_configure_blocks ||= []) << block
end

.default_settingsObject

Hash is frozen to avoid inheritance-mutability confusion.



282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
# File 'lib/traject/indexer.rb', line 282

def self.default_settings
  @default_settings ||= {
      # Writer defaults
      "writer_class_name"       => "Traject::SolrJsonWriter",
      "solr_writer.batch_size"  => 100,
      "solr_writer.thread_pool" => 1,

      # Threading and logging
      "processing_thread_pool"  => Traject::Indexer::Settings.default_processing_thread_pool,
      "log.batch_size.severity" => "info",

      # how to post-process the accumulator
      Traject::Indexer::ToFieldStep::ALLOW_NIL_VALUES => false,
      Traject::Indexer::ToFieldStep::ALLOW_DUPLICATE_VALUES  => true,
      Traject::Indexer::ToFieldStep::ALLOW_EMPTY_FIELDS => false
  }.freeze
end

.legacy_marc_mode!Object



327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
# File 'lib/traject/indexer.rb', line 327

def self.legacy_marc_mode!
  @@legacy_marc_mode = true
  # include legacy Marc macros
  include Traject::Macros::Marc21

  # Reader defaults
  legacy_settings = {
    "reader_class_name"       => "Traject::MarcReader",
    "marc_source.type"        => "binary",
  }

  default_settings.merge!(legacy_settings)

  self
end

Instance Method Details

#after_processing(aLambda = nil, &block) ⇒ Object

Part of DSL, register logic to be called once at the end of processing a stream of records.



359
360
361
# File 'lib/traject/indexer.rb', line 359

def after_processing(aLambda = nil, &block)
  @after_processing_steps << AfterProcessingStep.new(aLambda, block, Traject::Util.extract_caller_location(caller.first))
end

#completeObject

Closes the writer (which may flush/save/finalize buffered records), and calls run_after_processing_steps



637
638
639
640
641
642
643
644
# File 'lib/traject/indexer.rb', line 637

def complete
  writer.close if writer.respond_to?(:close)
  run_after_processing_steps

  # after an indexer has been completed, it is not really usable anymore,
  # as the writer has been closed.
  @completed = true
end

#completed?Boolean

Returns:

  • (Boolean)


621
622
623
# File 'lib/traject/indexer.rb', line 621

def completed?
  @completed
end

#configure(&block) ⇒ Object

Right now just does an instance_eval, but encouraged in case we change the underlying implementation later, and to make intent more clear.



189
190
191
# File 'lib/traject/indexer.rb', line 189

def configure(&block)
  instance_eval(&block)
end

#create_loggerObject

Create logger according to settings



383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
# File 'lib/traject/indexer.rb', line 383

def create_logger
  if settings["logger"]
    # none of the other settings matter, we just got a logger
    return settings["logger"]
  end

  logger_level  = settings["log.level"] || "info"

  # log everything to STDERR or specified logfile
  logger        = Yell::Logger.new(:null)
  logger.format = logger_format
  logger.level  = logger_level

  logger_destination = settings["log.file"] || "STDERR"
  # We intentionally repeat the logger_level
  # on the adapter, so it will stay there if overall level
  # is changed.
  case logger_destination
    when "STDERR"
      logger.adapter :stderr, level: logger_level, format: logger_format
    when "STDOUT"
      logger.adapter :stdout, level: logger_level, format: logger_format
    else
      logger.adapter :file, logger_destination, level: logger_level, format: logger_format
  end


  # ADDITIONALLY log error and higher to....
  if settings["log.error_file"]
    logger.adapter :file, settings["log.error_file"], :level => 'gte.error'
  end

  return logger
end

#each_record(aLambda = nil, &block) ⇒ Object

Part of DSL, register logic to be called for each record



353
354
355
# File 'lib/traject/indexer.rb', line 353

def each_record(aLambda = nil, &block)
  @index_steps << EachRecordStep.new(aLambda, block, Traject::Util.extract_caller_location(caller.first))
end

#load_config_file(file_path) ⇒ Object

Pass a string file path, a Pathname, or a File object, for a config file to load into indexer.

Can raise:

  • Errno::ENOENT or Errno::EACCES if file path is not accessible
  • Traject::Indexer::ConfigLoadError if exception is raised evaluating the config. A ConfigLoadError has information in it about original exception, and exactly what config file and line number triggered it.


234
235
236
237
238
239
240
241
242
# File 'lib/traject/indexer.rb', line 234

def load_config_file(file_path)
  File.open(file_path) do |file|
    begin
      self.instance_eval(file.read, file_path.to_s)
    rescue ScriptError, StandardError => e
      raise ConfigLoadError.new(file_path.to_s, e)
    end
  end
end

#log_skip(context) ⇒ Object

Log that the current record is being skipped, using data in context.position and context.skipmessage



748
749
750
# File 'lib/traject/indexer.rb', line 748

def log_skip(context)
  logger.debug "Skipped record #{context.record_inspect}: #{context.skipmessage}"
end

#logger_formatObject



370
371
372
373
374
375
376
377
378
379
380
# File 'lib/traject/indexer.rb', line 370

def logger_format
  format = settings["log.format"] || "%d %5L %m"
  format = case format
             when "false" then
               false
             when "" then
               nil
             else
               format
           end
end

#map_record(record) ⇒ Object

Processes a single record according to indexing rules set up in this indexer. Returns the output hash (a hash whose keys are string fields, and values are arrays of one or more values in that field)

If the record is marked skip as part of processing, this will return nil.

This is a convenience shortcut for #map_to_context! -- use that one if you want to provide addtional context like position, and/or get back the full context.



429
430
431
432
433
# File 'lib/traject/indexer.rb', line 429

def map_record(record)
  context = Context.new(:source_record => record, :settings => settings, :source_record_id_proc => source_record_id_proc, :logger => logger)
  map_to_context!(context)
  return context.output_hash unless context.skip?
end

#map_to_context!(context) ⇒ Object

Maps a single record INTO the second argument, a Traject::Indexer::Context.

Context must be passed with a #source_record and #settings, and optionally a #position.

Context will be mutated by this method, most significantly by adding an #output_hash, a hash from fieldname to array of values in that field.

Pass in a context with a set #position if you want that to be available to mapping routines.

Returns the context passed in as second arg, as a convenience for chaining etc.



464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
# File 'lib/traject/indexer.rb', line 464

def map_to_context!(context)
  @index_steps.each do |index_step|
    # Don't bother if we're skipping this record
    break if context.skip?

    # Set the index step for error reporting
    context.index_step = index_step
    handle_mapping_errors(context) do
      index_step.execute(context) # will always return [] for an each_record step
    end

    # And unset the index step now that we're finished
    context.index_step = nil
  end

  return context
end

#process(io_stream_or_array) ⇒ Object

Processes a stream of records, reading from the configured Reader, mapping according to configured mapping rules, and then writing to configured Writer.

You instead give it an array of streams, as well.

returns 'false' as a signal to command line to return non-zero exit code for some reason (reason found in logs, presumably). This particular mechanism is open to complexification, starting simple. We do need SOME way to return non-zero to command line.

Parameters:

  • (#read, Array<#read>)


531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
# File 'lib/traject/indexer.rb', line 531

def process(io_stream_or_array)
  check_uncompleted

  settings.fill_in_defaults!

  count      = 0
  start_time = batch_start_time = Time.now
  logger.debug "beginning Traject::Indexer#process with settings: #{settings.inspect}"

  processing_threads = settings["processing_thread_pool"].to_i
  thread_pool        = Traject::ThreadPool.new(processing_threads)

  logger.info "   Traject::Indexer with #{processing_threads} processing threads, reader: #{reader_class.name} and writer: #{writer.class.name}"

  #io_stream can now be an array of io_streams.
  (io_stream_or_array.kind_of?(Array) ? io_stream_or_array : [io_stream_or_array]).each do |io_stream|
    reader = self.reader!(io_stream)
    input_name = Traject::Util.io_name(io_stream)
    position_in_input = 0

    log_batch_size = settings["log.batch_size"] && settings["log.batch_size"].to_i

    reader.each do |record; safe_count, safe_position_in_input |
      count    += 1
      position_in_input += 1

      # have to use a block local var, so the changing `count` one
      # doesn't get caught in the closure. Don't totally get it, but
      # I think it's so.
      safe_count, safe_position_in_input = count, position_in_input

      thread_pool.raise_collected_exception!

      if settings["debug_ascii_progress"].to_s == "true"
        $stderr.write "." if count % settings["solr_writer.batch_size"].to_i == 0
      end

      context = Context.new(
          :source_record => record,
          :source_record_id_proc => source_record_id_proc,
          :settings      => settings,
          :position      => safe_count,
          :input_name    => input_name,
          :position_in_input => safe_position_in_input,
          :logger        => logger
      )


      if log_batch_size && (count % log_batch_size == 0)
        batch_rps   = log_batch_size / (Time.now - batch_start_time)
        overall_rps = count / (Time.now - start_time)
        logger.send(settings["log.batch_size.severity"].downcase.to_sym, "Traject::Indexer#process, read #{count} records at: #{context.record_inspect}; #{'%.0f' % batch_rps}/s this batch, #{'%.0f' % overall_rps}/s overall")
        batch_start_time = Time.now
      end

      # We pass context in a block arg to properly 'capture' it, so
      # we don't accidentally share the local var under closure between
      # threads.
      thread_pool.maybe_in_thread_pool(context) do |t_context|
        map_to_context!(t_context)
        if context.skip?
          log_skip(t_context)
        else
          writer.put t_context
        end
      end
    end
  end
  $stderr.write "\n" if settings["debug_ascii_progress"].to_s == "true"

  logger.debug "Shutting down #processing mapper threadpool..."
  thread_pool.shutdown_and_wait
  logger.debug "#processing mapper threadpool shutdown complete."

  thread_pool.raise_collected_exception!

  complete

  elapsed = Time.now - start_time
  avg_rps = (count / elapsed)
  logger.info "finished Traject::Indexer#process: #{count} records in #{'%.3f' % elapsed} seconds; #{'%.1f' % avg_rps} records/second overall."

  if writer.respond_to?(:skipped_record_count) && writer.skipped_record_count > 0
    logger.error "Traject::Indexer#process returning 'false' due to #{writer.skipped_record_count} skipped records."
    return false
  end

  return true
end

#process_record(record) ⇒ Object Also known as: <<

Takes a single record, maps it, and sends it to the instance-configured writer. No threading, no logging, no error handling. Respects skipped records by not adding them. Returns the Traject::Indexer::Context.

Aliased as #<<



440
441
442
443
444
445
446
447
448
# File 'lib/traject/indexer.rb', line 440

def process_record(record)
  check_uncompleted

  context = Context.new(:source_record => record, :settings => settings, :source_record_id_proc =>  source_record_id_proc, :logger => logger)
  map_to_context!(context)
  writer.put( context ) unless context.skip?

  return context
end

#process_with(source, destination = nil, close_writer: true, rescue_with: nil, on_skipped: nil) ⇒ Object

A light-weight process method meant for programmatic use, generally intended for only a "few" (not milliions) of records.

It does not use instance-configured reader or writer, instead taking a source/reader and destination/writer as arguments to this call.

The reader can be anything that has an #each returning source records. This includes an ordinary array of source records, or any traject Reader.

The writer can be anything with a #put method taking a Traject::Indexer::Context. For convenience, see the Traject::ArrayWriter that just collects output in an array.

Return value of process_with is the writer passed as second arg, for your convenience.

This does much less than the full #process method, to be more flexible and make fewer assumptions:

  • Will never use any additional threads (unless writer does). Wrap in your own threading if desired.
  • Will not do any standard logging or progress bars, regardless of indexer settings. Log yourself if desired.
  • Will not call any after_processing steps. Call yourself with indexer.run_after_processing_steps as desired.
  • WILL by default call #close on the writer, IF the writer has a #close method. pass :close_writer => false to not do so.
  • exceptions will just raise out, unless you pass in a rescue: option, value is a proc/lambda that will receive two args, context and exception. If the rescue proc doesn't re-raise, process_with will continue to process subsequent records.

Examples:

array_writer_instance = indexer.process_with([record1, record2], Traject::ArrayWriter.new)

With a block, in addition to or instead of a writer.


indexer.process_with([record]) do |context|
  do_something_with(context.output_hash)
end

Parameters:

  • source (#each)
  • destination (#put) (defaults to: nil)
  • close_writer (defaults to: true)

    whether the destination should have #close called on it, if it responds to.

  • rescue_with (Proc) (defaults to: nil)

    to call on errors, taking two args: A Traject::Indexer::Context and an exception. If nil (default), exceptions will be raised out. If set, you can raise or handle otherwise if you like.

  • on_skipped (Proc) (defaults to: nil)

    will be called for any skipped records, with one arg Traject::Indexer::Context



700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
# File 'lib/traject/indexer.rb', line 700

def process_with(source, destination = nil, close_writer: true, rescue_with: nil, on_skipped: nil)
  unless destination || block_given?
    raise ArgumentError, "Need either a second arg writer/destination, or a block"
  end

  settings.fill_in_defaults!

  position = 0
  input_name = Traject::Util.io_name(source)
  source.each do |record |
    begin
      position += 1

      context = Context.new(
          :source_record          => record,
          :source_record_id_proc  => source_record_id_proc,
          :settings               => settings,
          :position               => position,
          :position_in_input      => (position if input_name),
          :logger                 => logger
      )

      map_to_context!(context)

      if context.skip?
        on_skipped.call(context) if on_skipped
      else
        destination.put(context) if destination
        yield(context) if block_given?
      end
    rescue StandardError => e
      if rescue_with
        rescue_with.call(context, e)
      else
        raise e
      end
    end
  end

  if close_writer && destination.respond_to?(:close)
    destination.close
  end

  return destination
end

#reader!(io_stream) ⇒ Object

Instantiate a Traject Reader, using class set in #reader_class, initialized with io_stream passed in



767
768
769
# File 'lib/traject/indexer.rb', line 767

def reader!(io_stream)
  return reader_class.new(io_stream, settings.merge("logger" => logger))
end

#run_after_processing_stepsObject



646
647
648
649
650
651
652
653
654
655
# File 'lib/traject/indexer.rb', line 646

def run_after_processing_steps
  @after_processing_steps.each do |step|
    begin
      step.execute
    rescue StandardError => e
      logger.fatal("Unexpected exception #{e} when executing #{step}")
      raise e
    end
  end
end

#settings(new_settings = nil, &block) ⇒ Object

Part of the config file DSL, for writing settings values.

The Indexer's settings consist of a hash-like Traject::Settings object. The settings hash is not nested hashes, just one level of configuration settings. Keys are always strings, and by convention use "." for namespacing, eg log.file

The settings method with no arguments returns that Settings object.

With a hash and/or block argument, can be used to set new key/values. Each call merges onto the existing settings hash. The block is instance_evald in the context of the Traject::Settings object.

indexer.settings("a" => "a", "b" => "b")

indexer.settings do provide "b", "new b" end

indexer.settings #=> => "a", "b" => "new b"

Note the #provide method is defined on Traject::Settings to write to a setting only if previously not set. You can also use #store to force over-writing even if an existing setting.

Even with arguments, Indexer#settings returns the Settings object, hash too, so can method calls can be chained.



273
274
275
276
277
278
279
# File 'lib/traject/indexer.rb', line 273

def settings(new_settings = nil, &block)
  @settings.merge!(new_settings) if new_settings

  @settings.instance_eval(&block) if block_given?

  return @settings
end

#source_record_id_procObject

Sub-classes should override to return a proc object that takes one arg, a source record, and returns an identifier for it that can be used in logged messages. This differs depending on input record format, is why we leave it to sub-classes.



313
314
315
316
317
318
319
320
321
322
323
324
325
# File 'lib/traject/indexer.rb', line 313

def source_record_id_proc
  if defined?(@@legacy_marc_mode) && @@legacy_marc_mode
    return @source_record_id_proc ||= lambda do |source_marc_record|
      if ( source_marc_record &&
           source_marc_record.kind_of?(MARC::Record) &&
           source_marc_record['001'] )
        source_marc_record['001'].value
      end
    end
  end

  @source_record_id_proc ||= lambda { |source| nil }
end

#to_field(field_name, *procs, &block) ⇒ Object

Part of DSL, used to define an indexing mapping. Register logic to be called for each record, and generate values for a particular output field. The first field_name argument can be a single string, or an array of multiple strings -- in the latter case, the processed values will be added to each field mentioned.



348
349
350
# File 'lib/traject/indexer.rb', line 348

def to_field(field_name, *procs, &block)
  @index_steps << ToFieldStep.new(field_name, procs, block, Traject::Util.extract_caller_location(caller.first))
end

#writer!Object

Instantiate a Traject Writer, suing class set in #writer_class



772
773
774
775
# File 'lib/traject/indexer.rb', line 772

def writer!
  writer_class = @writer_class || qualified_const_get(settings["writer_class_name"])
  writer_class.new(settings.merge("logger" => logger))
end