Class: Traject::Indexer
- Inherits:
-
Object
- Object
- Traject::Indexer
- Includes:
- Macros::Basic, Macros::Marc21, QualifiedConstGet
- Defined in:
- lib/traject/indexer.rb,
lib/traject/indexer/settings.rb
Overview
This class does indexing for traject: Getting input records from a Reader class, mapping the input records to an output hash, and then sending the output hash off somewhere (usually Solr) with a Writer class.
Traject config files are instance_eval
d in an Indexer object, so self
in
a config file is an Indexer, and any Indexer methods can be called.
However, certain Indexer methods exist almost entirely for the purpose of being called in config files; these methods are part of the expected Domain-Specific Language ("DSL") for config files, and will ordinarily form the bulk or entirety of config files:
- #settings
- #to_field
- #each_record
- #after_procesing
- #logger (rarely used in config files, but in some cases to set up custom logging config)
If accessing a Traject::Indexer programmatically (instead of via command line with config files), additional methods of note include:
# to process a stream of input records from configured Reader,
# to configured Writer:
indexer.process(io_stream)
# To map a single input record manually to an ouput_hash,
# ignoring Readers and Writers
hash = indexer.map_record(record)
## Readers and Writers
The Indexer has a modularized architecture for readers and writers, for where source records come from (reader), and where output is sent to (writer).
A Reader is any class that: 1) Has a two-argument initializer taking an IO stream and a Settings hash 2) Responds to the usual ruby #each, returning a source record from each #each. (Including Enumerable is prob a good idea too)
The default reader is the Traject::Marc4JReader, who's behavior is further customized by several settings in the Settings hash.
Alternate readers can be set directly with the #reader_class= method, or with the "reader_class_name" Setting, a String name of a class meeting the reader contract.
A Writer is any class that: 1) Has a one-argument initializer taking a Settings hash. (The logger is provided to the Writer in settings["logger"]) 2) Responds to a one argument #put method, where the argument is a Traject::Indexer::Context, containing an #output_hash hash of mapped keys/values. The writer should write them to the appropriate place. 3) Responds to a #close method, called when we're done. 4) Optionally implements a #skipped_record_count method, returning int count of records that were skipped due to errors (and presumably logged)
The default writer is the SolrJWriter, using Java SolrJ to write to a Solr. A few other built-in writers are available, but it's anticipated more will be created as plugins or local code for special purposes.
You can set alternate writers by setting a Class object directly with the #writer_class method, or by the 'writer_class_name' Setting, with a String name of class meeting the Writer contract.
Defined Under Namespace
Classes: AfterProcessingStep, ArityError, Context, EachRecordStep, NamingError, Settings, ToFieldStep
Constant Summary
Constants included from Macros::Marc21
Macros::Marc21::EXTRACT_ALL_MARC_VALID_OPTIONS, Macros::Marc21::EXTRACT_MARC_VALID_OPTIONS, Macros::Marc21::SERIALZED_MARC_VALID_OPTIONS
Instance Attribute Summary collapse
Instance Method Summary collapse
-
#after_processing(aLambda = nil, &block) ⇒ Object
Part of DSL, register logic to be called once at the end of processing a stream of records.
-
#create_logger ⇒ Object
Create logger according to settings.
-
#each_record(aLambda = nil, &block) ⇒ Object
Part of DSL, register logic to be called for each record.
-
#id_string(record) ⇒ Object
get a printable id from record for error logging.
-
#initialize(arg_settings = {}) ⇒ Indexer
constructor
optional hash or Traject::Indexer::Settings object of settings.
-
#log_mapping_errors(context, index_step) ⇒ Object
just a wrapper that captures and records any unexpected errors raised in mapping, along with contextual information on record and location in source file of mapping rule.
-
#log_skip(context) ⇒ Object
Log that the current record is being skipped, using data in context.position and context.skipmessage.
- #logger_format ⇒ Object
-
#map_record(record) ⇒ Object
Processes a single record according to indexing rules set up in this indexer.
-
#map_to_context!(context) ⇒ Object
Maps a single record INTO the second argument, a Traject::Indexer::Context.
-
#process(io_stream) ⇒ Object
Processes a stream of records, reading from the configured Reader, mapping according to configured mapping rules, and then writing to configured Writer.
-
#reader!(io_stream) ⇒ Object
Instantiate a Traject Reader, using class set in #reader_class, initialized with io_stream passed in.
-
#settings(new_settings = nil, &block) ⇒ Object
Part of the config file DSL, for writing settings values.
-
#to_field(field_name, aLambda = nil, &block) ⇒ Object
Part of DSL, used to define an indexing mapping.
-
#writer! ⇒ Object
Instantiate a Traject Writer, suing class set in #writer_class.
Methods included from Macros::Basic
Methods included from Macros::Marc21
apply_extraction_options, #extract_all_marc_values, #extract_marc, first!, #serialized_marc, trim_punctuation
Methods included from QualifiedConstGet
Constructor Details
Instance Attribute Details
#logger ⇒ Object
166 167 168 |
# File 'lib/traject/indexer.rb', line 166 def logger @logger ||= create_logger end |
#reader_class ⇒ Object
394 395 396 397 398 399 |
# File 'lib/traject/indexer.rb', line 394 def reader_class unless defined? @reader_class @reader_class = qualified_const_get(settings["reader_class_name"]) end return @reader_class end |
#writer_class ⇒ Object
401 402 403 404 405 406 |
# File 'lib/traject/indexer.rb', line 401 def writer_class unless defined? @writer_class @writer_class = qualified_const_get(settings["writer_class_name"]) end return @writer_class end |
Instance Method Details
#after_processing(aLambda = nil, &block) ⇒ Object
Part of DSL, register logic to be called once at the end of processing a stream of records.
162 163 164 |
# File 'lib/traject/indexer.rb', line 162 def after_processing(aLambda = nil, &block) @after_processing_steps << AfterProcessingStep.new(aLambda, block, Traject::Util.extract_caller_location(caller.first)) end |
#create_logger ⇒ Object
Create logger according to settings
182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 |
# File 'lib/traject/indexer.rb', line 182 def create_logger logger_level = settings["log.level"] || "info" # log everything to STDERR or specified logfile logger = Yell::Logger.new(:null) logger.format = logger_format logger.level = logger_level logger_destination = settings["log.file"] || "STDERR" # We intentionally repeat the logger_level # on the adapter, so it will stay there if overall level # is changed. case logger_destination when "STDERR" logger.adapter :stderr, level: logger_level, format: logger_format when "STDOUT" logger.adapter :stdout, level: logger_level, format: logger_format else logger.adapter :file, logger_destination, level: logger_level, format: logger_format end # ADDITIONALLY log error and higher to.... if settings["log.error_file"] logger.adapter :file, settings["log.error_file"], :level => 'gte.error' end return logger end |
#each_record(aLambda = nil, &block) ⇒ Object
Part of DSL, register logic to be called for each record
156 157 158 |
# File 'lib/traject/indexer.rb', line 156 def each_record(aLambda = nil, &block) @index_steps << EachRecordStep.new(aLambda, block, Traject::Util.extract_caller_location(caller.first) ) end |
#id_string(record) ⇒ Object
get a printable id from record for error logging. Maybe override this for a future XML version.
290 291 292 |
# File 'lib/traject/indexer.rb', line 290 def id_string(record) record && record['001'] && record['001'].value.to_s end |
#log_mapping_errors(context, index_step) ⇒ Object
just a wrapper that captures and records any unexpected errors raised in mapping, along with contextual information on record and location in source file of mapping rule.
Re-raises error at the moment.
log_mapping_errors(context, index_step) do all_sorts_of_stuff # that will have errors logged end
269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 |
# File 'lib/traject/indexer.rb', line 269 def log_mapping_errors(context, index_step) begin yield rescue Exception => e msg = "Unexpected error on record id `#{id_string(context.source_record)}` at file position #{context.position}\n" msg += " while executing #{index_step.inspect}\n" msg += Traject::Util.(e) logger.error msg begin logger.debug "Record: " + context.source_record.to_s rescue Exception => marc_to_s_exception logger.debug "(Could not log record, #{marc_to_s_exception})" end raise e end end |
#log_skip(context) ⇒ Object
Log that the current record is being skipped, using data in context.position and context.skipmessage
390 391 392 |
# File 'lib/traject/indexer.rb', line 390 def log_skip(context) logger.debug "Skipped record #{context.position}: #{context.}" end |
#logger_format ⇒ Object
172 173 174 175 176 177 178 179 |
# File 'lib/traject/indexer.rb', line 172 def logger_format format = settings["log.format"] || "%d %5L %m" format = case format when "false" then false when "" then nil else format end end |
#map_record(record) ⇒ Object
Processes a single record according to indexing rules set up in this indexer. Returns the output hash (a hash whose keys are string fields, and values are arrays of one or more values in that field)
This is a convenience shortcut for #map_to_context! -- use that one if you want to provide addtional context like position, and/or get back the full context.
220 221 222 223 224 |
# File 'lib/traject/indexer.rb', line 220 def map_record(record) context = Context.new(:source_record => record, :settings => settings) map_to_context!(context) return context.output_hash end |
#map_to_context!(context) ⇒ Object
Maps a single record INTO the second argument, a Traject::Indexer::Context.
Context must be passed with a #source_record and #settings, and optionally a #position.
Context will be mutated by this method, most significantly by adding an #output_hash, a hash from fieldname to array of values in that field.
Pass in a context with a set #position if you want that to be available to mapping routines.
Returns the context passed in as second arg, as a convenience for chaining etc.
239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 |
# File 'lib/traject/indexer.rb', line 239 def map_to_context!(context) @index_steps.each do |index_step| # Don't bother if we're skipping this record break if context.skip? context.index_step = index_step accumulator = log_mapping_errors(context, index_step) do index_step.execute(context) # will always return [] for an each_record step end if accumulator.size > 0 accumulator.compact! (context.output_hash[index_step.field_name] ||= []).concat accumulator end context.index_step = nil end return context end |
#process(io_stream) ⇒ Object
Processes a stream of records, reading from the configured Reader, mapping according to configured mapping rules, and then writing to configured Writer.
returns 'false' as a signal to command line to return non-zero exit code for some reason (reason found in logs, presumably). This particular mechanism is open to complexification, starting simple. We do need SOME way to return non-zero to command line.
303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 |
# File 'lib/traject/indexer.rb', line 303 def process(io_stream) settings.fill_in_defaults! count = 0 start_time = batch_start_time = Time.now logger.debug "beginning Indexer#process with settings: #{settings.inspect}" reader = self.reader!(io_stream) writer = self.writer! thread_pool = Traject::ThreadPool.new(settings["processing_thread_pool"].to_i) logger.info " Indexer with reader: #{reader.class.name} and writer: #{writer.class.name}" log_batch_size = settings["log.batch_size"] && settings["log.batch_size"].to_i reader.each do |record; position| count += 1 # have to use a block local var, so the changing `count` one # doesn't get caught in the closure. Weird, yeah. position = count thread_pool.raise_collected_exception! if settings["debug_ascii_progress"].to_s == "true" $stderr.write "." if count % settings["solrj_writer.batch_size"] == 0 end if log_batch_size && (count % log_batch_size == 0) batch_rps = log_batch_size / (Time.now - batch_start_time) overall_rps = count / (Time.now - start_time) logger.send(settings["log.batch_size.severity"].downcase.to_sym, "Traject::Indexer#process, read #{count} records at id:#{id_string(record)}; #{'%.0f' % batch_rps}/s this batch, #{'%.0f' % overall_rps}/s overall") batch_start_time = Time.now end # we have to use this weird lambda to properly "capture" the count, instead # of having it be bound to the original variable in a non-threadsafe way. # This is confusing, I might not be understanding things properly, but that's where i am. #thread_pool.maybe_in_thread_pool &make_lambda(count, record, writer) thread_pool.maybe_in_thread_pool(record, settings, position) do |record, settings, position| context = Context.new(:source_record => record, :settings => settings, :position => position) context.logger = logger map_to_context!(context) if context.skip? log_skip(context) else writer.put context end end end $stderr.write "\n" if settings["debug_ascii_progress"].to_s == "true" logger.debug "Shutting down #processing mapper threadpool..." thread_pool.shutdown_and_wait logger.debug "#processing mapper threadpool shutdown complete." thread_pool.raise_collected_exception! writer.close if writer.respond_to?(:close) @after_processing_steps.each do |step| begin step.execute rescue Exception => e logger.fatal("Unexpected exception #{e} when executing #{step}") raise e end end elapsed = Time.now - start_time avg_rps = (count / elapsed) logger.info "finished Indexer#process: #{count} records in #{'%.3f' % elapsed} seconds; #{'%.1f' % avg_rps} records/second overall." if writer.respond_to?(:skipped_record_count) && writer.skipped_record_count > 0 logger.error "Indexer#process returning 'false' due to #{writer.skipped_record_count} skipped records." return false end return true end |
#reader!(io_stream) ⇒ Object
Instantiate a Traject Reader, using class set in #reader_class, initialized with io_stream passed in
410 411 412 |
# File 'lib/traject/indexer.rb', line 410 def reader!(io_stream) return reader_class.new(io_stream, settings.merge("logger" => logger)) end |
#settings(new_settings = nil, &block) ⇒ Object
Part of the config file DSL, for writing settings values.
The Indexer's settings consist of a hash-like Traject::Settings
object. The settings hash is not nested hashes, just one level
of configuration settings. Keys are always strings, and by convention
use "." for namespacing, eg log.file
The settings method with no arguments returns that Settings object.
With a hash and/or block argument, can be used to set
new key/values. Each call merges onto the existing settings
hash. The block is instance_eval
d in the context
of the Traject::Settings object.
indexer.settings("a" => "a", "b" => "b")
indexer.settings do provide "b", "new b" end
indexer.settings #=> => "a", "b" => "new b"
Note the #provide method is defined on Traject::Settings to write to a setting only if previously not set. You can also use #store to force over-writing even if an existing setting.
Even with arguments, Indexer#settings returns the Settings object, hash too, so can method calls can be chained.
140 141 142 143 144 145 146 |
# File 'lib/traject/indexer.rb', line 140 def settings(new_settings = nil, &block) @settings.merge!(new_settings) if new_settings @settings.instance_eval &block if block return @settings end |
#to_field(field_name, aLambda = nil, &block) ⇒ Object
Part of DSL, used to define an indexing mapping. Register logic to be called for each record, and generate values for a particular output field.
151 152 153 |
# File 'lib/traject/indexer.rb', line 151 def to_field(field_name, aLambda = nil, &block) @index_steps << ToFieldStep.new(field_name, aLambda, block, Traject::Util.extract_caller_location(caller.first) ) end |
#writer! ⇒ Object
Instantiate a Traject Writer, suing class set in #writer_class
415 416 417 |
# File 'lib/traject/indexer.rb', line 415 def writer! return writer_class.new(settings.merge("logger" => logger)) end |