Class: Traject::Indexer

Inherits:
Object
  • Object
show all
Includes:
Macros::Basic, Macros::Marc21, QualifiedConstGet
Defined in:
lib/traject/indexer.rb,
lib/traject/indexer/settings.rb

Overview

This class does indexing for traject: Getting input records from a Reader class, mapping the input records to an output hash, and then sending the output hash off somewhere (usually Solr) with a Writer class.

Traject config files are instance_evald in an Indexer object, so self in a config file is an Indexer, and any Indexer methods can be called.

However, certain Indexer methods exist almost entirely for the purpose of being called in config files; these methods are part of the expected Domain-Specific Language ("DSL") for config files, and will ordinarily form the bulk or entirety of config files:

  • #settings
  • #to_field
  • #each_record
  • #after_procesing
  • #logger (rarely used in config files, but in some cases to set up custom logging config)

If accessing a Traject::Indexer programmatically (instead of via command line with config files), additional methods of note include:

# to process a stream of input records from configured Reader,
# to configured Writer:
indexer.process(io_stream)

# To map a single input record manually to an ouput_hash,
# ignoring Readers and Writers
hash = indexer.map_record(record)

## Readers and Writers

The Indexer has a modularized architecture for readers and writers, for where source records come from (reader), and where output is sent to (writer).

A Reader is any class that: 1) Has a two-argument initializer taking an IO stream and a Settings hash 2) Responds to the usual ruby #each, returning a source record from each #each. (Including Enumerable is prob a good idea too)

The default reader is the Traject::Marc4JReader, who's behavior is further customized by several settings in the Settings hash.

Alternate readers can be set directly with the #reader_class= method, or with the "reader_class_name" Setting, a String name of a class meeting the reader contract.

A Writer is any class that: 1) Has a one-argument initializer taking a Settings hash. (The logger is provided to the Writer in settings["logger"]) 2) Responds to a one argument #put method, where the argument is a Traject::Indexer::Context, containing an #output_hash hash of mapped keys/values. The writer should write them to the appropriate place. 3) Responds to a #close method, called when we're done. 4) Optionally implements a #skipped_record_count method, returning int count of records that were skipped due to errors (and presumably logged)

The default writer is the SolrJWriter, using Java SolrJ to write to a Solr. A few other built-in writers are available, but it's anticipated more will be created as plugins or local code for special purposes.

You can set alternate writers by setting a Class object directly with the #writer_class method, or by the 'writer_class_name' Setting, with a String name of class meeting the Writer contract.

Defined Under Namespace

Classes: AfterProcessingStep, ArityError, Context, EachRecordStep, NamingError, Settings, ToFieldStep

Constant Summary

Constants included from Macros::Marc21

Macros::Marc21::EXTRACT_ALL_MARC_VALID_OPTIONS, Macros::Marc21::EXTRACT_MARC_VALID_OPTIONS, Macros::Marc21::SERIALZED_MARC_VALID_OPTIONS

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Macros::Basic

#literal

Methods included from Macros::Marc21

apply_extraction_options, #extract_all_marc_values, #extract_marc, first!, #serialized_marc, trim_punctuation

Methods included from QualifiedConstGet

#qualified_const_get

Constructor Details

#initialize(arg_settings = {}) ⇒ Indexer

optional hash or Traject::Indexer::Settings object of settings.



105
106
107
108
109
# File 'lib/traject/indexer.rb', line 105

def initialize(arg_settings = {})
  @settings = Settings.new(arg_settings)
  @index_steps = []
  @after_processing_steps = []
end

Instance Attribute Details

#loggerObject



166
167
168
# File 'lib/traject/indexer.rb', line 166

def logger
  @logger ||= create_logger
end

#reader_classObject



394
395
396
397
398
399
# File 'lib/traject/indexer.rb', line 394

def reader_class
  unless defined? @reader_class
    @reader_class = qualified_const_get(settings["reader_class_name"])
  end
  return @reader_class
end

#writer_classObject



401
402
403
404
405
406
# File 'lib/traject/indexer.rb', line 401

def writer_class
  unless defined? @writer_class
    @writer_class = qualified_const_get(settings["writer_class_name"])
  end
  return @writer_class
end

Instance Method Details

#after_processing(aLambda = nil, &block) ⇒ Object

Part of DSL, register logic to be called once at the end of processing a stream of records.



162
163
164
# File 'lib/traject/indexer.rb', line 162

def after_processing(aLambda = nil, &block)
  @after_processing_steps << AfterProcessingStep.new(aLambda, block, Traject::Util.extract_caller_location(caller.first))
end

#create_loggerObject

Create logger according to settings



182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
# File 'lib/traject/indexer.rb', line 182

def create_logger

  logger_level  = settings["log.level"] || "info"

  # log everything to STDERR or specified logfile
  logger = Yell::Logger.new(:null)
  logger.format = logger_format
  logger.level  = logger_level

  logger_destination = settings["log.file"] || "STDERR"
  # We intentionally repeat the logger_level
  # on the adapter, so it will stay there if overall level
  # is changed.
  case logger_destination
  when "STDERR"
    logger.adapter :stderr, level: logger_level, format: logger_format
  when "STDOUT"
    logger.adapter :stdout, level: logger_level, format: logger_format
  else
    logger.adapter :file, logger_destination, level: logger_level, format: logger_format
  end


  # ADDITIONALLY log error and higher to....
  if settings["log.error_file"]
    logger.adapter :file, settings["log.error_file"], :level => 'gte.error'
  end

  return logger
end

#each_record(aLambda = nil, &block) ⇒ Object

Part of DSL, register logic to be called for each record



156
157
158
# File 'lib/traject/indexer.rb', line 156

def each_record(aLambda = nil, &block)
  @index_steps << EachRecordStep.new(aLambda, block, Traject::Util.extract_caller_location(caller.first) )
end

#id_string(record) ⇒ Object

get a printable id from record for error logging. Maybe override this for a future XML version.



290
291
292
# File 'lib/traject/indexer.rb', line 290

def id_string(record)
  record && record['001'] && record['001'].value.to_s
end

#log_mapping_errors(context, index_step) ⇒ Object

just a wrapper that captures and records any unexpected errors raised in mapping, along with contextual information on record and location in source file of mapping rule.

Re-raises error at the moment.

log_mapping_errors(context, index_step) do all_sorts_of_stuff # that will have errors logged end



269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
# File 'lib/traject/indexer.rb', line 269

def log_mapping_errors(context, index_step)
  begin
    yield
  rescue Exception => e
    msg =  "Unexpected error on record id `#{id_string(context.source_record)}` at file position #{context.position}\n"
    msg += "    while executing #{index_step.inspect}\n"
    msg += Traject::Util.exception_to_log_message(e)

    logger.error msg
    begin
      logger.debug "Record: " + context.source_record.to_s
    rescue Exception => marc_to_s_exception
      logger.debug "(Could not log record, #{marc_to_s_exception})"
    end

    raise e
  end
end

#log_skip(context) ⇒ Object

Log that the current record is being skipped, using data in context.position and context.skipmessage



390
391
392
# File 'lib/traject/indexer.rb', line 390

def log_skip(context)
  logger.debug "Skipped record #{context.position}: #{context.skipmessage}"
end

#logger_formatObject



172
173
174
175
176
177
178
179
# File 'lib/traject/indexer.rb', line 172

def logger_format
  format = settings["log.format"] || "%d %5L %m"
  format = case format
    when "false" then false
    when "" then nil
    else format
  end
end

#map_record(record) ⇒ Object

Processes a single record according to indexing rules set up in this indexer. Returns the output hash (a hash whose keys are string fields, and values are arrays of one or more values in that field)

This is a convenience shortcut for #map_to_context! -- use that one if you want to provide addtional context like position, and/or get back the full context.



220
221
222
223
224
# File 'lib/traject/indexer.rb', line 220

def map_record(record)
  context = Context.new(:source_record => record, :settings => settings)
  map_to_context!(context)
  return context.output_hash
end

#map_to_context!(context) ⇒ Object

Maps a single record INTO the second argument, a Traject::Indexer::Context.

Context must be passed with a #source_record and #settings, and optionally a #position.

Context will be mutated by this method, most significantly by adding an #output_hash, a hash from fieldname to array of values in that field.

Pass in a context with a set #position if you want that to be available to mapping routines.

Returns the context passed in as second arg, as a convenience for chaining etc.



239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
# File 'lib/traject/indexer.rb', line 239

def map_to_context!(context)
  @index_steps.each do |index_step|
    # Don't bother if we're skipping this record
    break if context.skip?

    context.index_step = index_step
    accumulator = log_mapping_errors(context, index_step) do
      index_step.execute(context) # will always return [] for an each_record step
    end

    if accumulator.size > 0
      accumulator.compact!
      (context.output_hash[index_step.field_name] ||= []).concat accumulator
    end

    context.index_step = nil
  end

  return context
end

#process(io_stream) ⇒ Object

Processes a stream of records, reading from the configured Reader, mapping according to configured mapping rules, and then writing to configured Writer.

returns 'false' as a signal to command line to return non-zero exit code for some reason (reason found in logs, presumably). This particular mechanism is open to complexification, starting simple. We do need SOME way to return non-zero to command line.



303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
# File 'lib/traject/indexer.rb', line 303

def process(io_stream)
  settings.fill_in_defaults!

  count      =       0
  start_time = batch_start_time = Time.now
  logger.debug "beginning Indexer#process with settings: #{settings.inspect}"

  reader = self.reader!(io_stream)
  writer = self.writer!

  thread_pool = Traject::ThreadPool.new(settings["processing_thread_pool"].to_i)

  logger.info "   Indexer with reader: #{reader.class.name} and writer: #{writer.class.name}"

  log_batch_size = settings["log.batch_size"] && settings["log.batch_size"].to_i

  reader.each do |record; position|
    count += 1

    # have to use a block local var, so the changing `count` one
    # doesn't get caught in the closure. Weird, yeah.
    position = count

    thread_pool.raise_collected_exception!

    if settings["debug_ascii_progress"].to_s == "true"
      $stderr.write "." if count % settings["solrj_writer.batch_size"] == 0
    end

    if log_batch_size && (count % log_batch_size == 0)
      batch_rps = log_batch_size / (Time.now - batch_start_time)
      overall_rps = count / (Time.now - start_time)
      logger.send(settings["log.batch_size.severity"].downcase.to_sym, "Traject::Indexer#process, read #{count} records at id:#{id_string(record)}; #{'%.0f' % batch_rps}/s this batch, #{'%.0f' % overall_rps}/s overall")
      batch_start_time = Time.now
    end

    # we have to use this weird lambda to properly "capture" the count, instead
    # of having it be bound to the original variable in a non-threadsafe way.
    # This is confusing, I might not be understanding things properly, but that's where i am.
    #thread_pool.maybe_in_thread_pool &make_lambda(count, record, writer)
    thread_pool.maybe_in_thread_pool(record, settings, position) do |record, settings, position|
      context = Context.new(:source_record => record, :settings => settings, :position => position)
      context.logger = logger
      map_to_context!(context)
      if context.skip?
        log_skip(context)
      else
        writer.put context
      end

    end

  end
  $stderr.write "\n" if settings["debug_ascii_progress"].to_s == "true"

  logger.debug "Shutting down #processing mapper threadpool..."
  thread_pool.shutdown_and_wait
  logger.debug "#processing mapper threadpool shutdown complete."

  thread_pool.raise_collected_exception!


  writer.close if writer.respond_to?(:close)

  @after_processing_steps.each do |step|
    begin
      step.execute
    rescue Exception => e
      logger.fatal("Unexpected exception #{e} when executing #{step}")
      raise e
    end
  end

  elapsed        = Time.now - start_time
  avg_rps        = (count / elapsed)
  logger.info "finished Indexer#process: #{count} records in #{'%.3f' % elapsed} seconds; #{'%.1f' % avg_rps} records/second overall."

  if writer.respond_to?(:skipped_record_count) && writer.skipped_record_count > 0
    logger.error "Indexer#process returning 'false' due to #{writer.skipped_record_count} skipped records."
    return false
  end

  return true
end

#reader!(io_stream) ⇒ Object

Instantiate a Traject Reader, using class set in #reader_class, initialized with io_stream passed in



410
411
412
# File 'lib/traject/indexer.rb', line 410

def reader!(io_stream)
  return reader_class.new(io_stream, settings.merge("logger" => logger))
end

#settings(new_settings = nil, &block) ⇒ Object

Part of the config file DSL, for writing settings values.

The Indexer's settings consist of a hash-like Traject::Settings object. The settings hash is not nested hashes, just one level of configuration settings. Keys are always strings, and by convention use "." for namespacing, eg log.file

The settings method with no arguments returns that Settings object.

With a hash and/or block argument, can be used to set new key/values. Each call merges onto the existing settings hash. The block is instance_evald in the context of the Traject::Settings object.

indexer.settings("a" => "a", "b" => "b")

indexer.settings do provide "b", "new b" end

indexer.settings #=> => "a", "b" => "new b"

Note the #provide method is defined on Traject::Settings to write to a setting only if previously not set. You can also use #store to force over-writing even if an existing setting.

Even with arguments, Indexer#settings returns the Settings object, hash too, so can method calls can be chained.



140
141
142
143
144
145
146
# File 'lib/traject/indexer.rb', line 140

def settings(new_settings = nil, &block)
  @settings.merge!(new_settings) if new_settings

  @settings.instance_eval &block if block

  return @settings
end

#to_field(field_name, aLambda = nil, &block) ⇒ Object

Part of DSL, used to define an indexing mapping. Register logic to be called for each record, and generate values for a particular output field.



151
152
153
# File 'lib/traject/indexer.rb', line 151

def to_field(field_name, aLambda = nil, &block)
  @index_steps << ToFieldStep.new(field_name, aLambda, block, Traject::Util.extract_caller_location(caller.first) )
end

#writer!Object

Instantiate a Traject Writer, suing class set in #writer_class



415
416
417
# File 'lib/traject/indexer.rb', line 415

def writer!
  return writer_class.new(settings.merge("logger" => logger))
end