Class: Traject::SolrJWriter

Inherits:
Object
  • Object
show all
Includes:
QualifiedConstGet
Defined in:
lib/traject/solrj_writer.rb

Overview

Writes to a Solr using SolrJ, and the SolrJ HttpSolrServer.

After you call #close, you can check #skipped_record_count if you want for an integer count of skipped records.

For fatal errors that raise... async processing with thread_pool means that you may not get a raise immediately after calling #put, you may get it on a FUTURE #put or #close. You should get it eventually though.

Settings

  • solr.url: Your solr url (required)

  • solrj_writer.server_class_name: Defaults to "HttpSolrServer". You can specify another Solr Server sub-class, but it has to take a one-arg url constructor. Maybe subclass this writer class and overwrite instantiate_solr_server! otherwise

  • solrj.jar_dir: Custom directory containing all of the SolrJ jars. All jars in this dir will be loaded. Otherwise, we load our own packaged solrj jars. This setting can't really be used differently in the same app instance, since jars are loaded globally.

  • solrj_writer.parser_class_name: A String name of a class in package org.apache.solr.client.solrj.impl, we'll instantiate one with a zero-arg constructor, and pass it as an arg to setParser on the SolrServer instance, if present. NOTE: For contacting a Solr 1.x server, with the recent version of SolrJ used by default, set to "XMLResponseParser"

  • solrj_writer.commit_on_close: If true (or string 'true'), send a commit to solr at end of #process.

  • solrj_writer.batch_size: If non-nil and more than 1, send documents to solr in batches of solrj_writer.batch_size. If nil/1, however, an http transaction with solr will be done per doc. DEFAULT to 100, which seems to be a sweet spot.

  • solrj_writer.thread_pool: Defaults to 1. A thread pool is used for submitting docs to solr. Set to 0 or nil to disable threading. Set to 1, there will still be a single bg thread doing the adds. For very fast Solr servers and very fast indexing processes, may make sense to increase this value to throw at Solr as fast as it can catch.

Example

settings do
  provide "writer_class_name", "Traject::SolrJWriter"

  # This is just regular ruby, so don't be afraid to have conditionals!
  # Switch on hostname, for test and production server differences
  if Socket.gethostname =~ /devhost/
    provide "solr.url", "http://my.dev.machine:9033/catalog"
  else
    provide "solr.url", "http://my.production.machine:9033/catalog"
  end

  provide "solrj_writer.parser_class_name", "BinaryResponseParser" # for Solr 4.x
  # provide "solrj_writer.parser_class_name", "XMLResponseParser" # For solr 1.x or 3.x

  provide "solrj_writer.commit_on_close", "true"
end

Defined Under Namespace

Classes: UpdatePackage

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from QualifiedConstGet

#qualified_const_get

Constructor Details

#initialize(argSettings) ⇒ SolrJWriter

Returns a new instance of SolrJWriter.



96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/traject/solrj_writer.rb', line 96

def initialize(argSettings)
  @settings = Traject::Indexer::Settings.new(argSettings)
  settings_check!(settings)

  ensure_solrj_loaded!

  solr_server # init

  @batched_queue = java.util.concurrent.LinkedBlockingQueue.new

  # when multi-threaded exceptions raised in threads are held here
  # we need a HIGH performance queue here to try and avoid slowing things down,
  # since we need to check it frequently.
  @async_exception_queue = java.util.concurrent.ConcurrentLinkedQueue.new

  # Store error count in an AtomicInteger, so multi threads can increment
  # it safely, if we're threaded.
  @skipped_record_incrementer = java.util.concurrent.atomic.AtomicInteger.new(0)

  # if our thread pool settings are 0, it'll just create a null threadpool that
  # executes in calling context.
  @thread_pool = Traject::ThreadPool.new( @settings["solrj_writer.thread_pool"].to_i )

  @debug_ascii_progress = (@settings["debug_ascii_progress"].to_s == "true")

  logger.info("   #{self.class.name} writing to '#{settings['solr.url']}'")
end

Instance Attribute Details

#batched_queueObject (readonly)

Returns the value of attribute batched_queue.



94
95
96
# File 'lib/traject/solrj_writer.rb', line 94

def batched_queue
  @batched_queue
end

#settingsObject (readonly)

Returns the value of attribute settings.



92
93
94
# File 'lib/traject/solrj_writer.rb', line 92

def settings
  @settings
end

#solr_serverObject



321
322
323
# File 'lib/traject/solrj_writer.rb', line 321

def solr_server
  @solr_server ||= instantiate_solr_server!
end

Instance Method Details

#add_one_document_package(package) ⇒ Object

Adds a single SolrInputDocument passed in as an UpdatePackage combo of SolrInputDocument and context.

Rescues exceptions thrown by SolrServer.add, logs them, and then raises them again if deemed fatal and should stop indexing. Only intended to be used on a SINGLE document add. If we get an exception on a multi-doc batch add, we need to recover differently.



227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
# File 'lib/traject/solrj_writer.rb', line 227

def add_one_document_package(package)
  begin
    solr_server.add(package.solr_document)
  # Honestly not sure what the difference is between those types, but SolrJ raises both
  rescue org.apache.solr.common.SolrException, org.apache.solr.client.solrj.SolrServerException  => e
    id        = package.context.source_record && package.context.source_record['001'] && package.context.source_record['001'].value
    id_str    = id ? "001:#{id}" : ""

    position  = package.context.position
    position_str = position ? "at file position #{position} (starting at 1)" : ""

    logger.error("Could not index record #{id_str} #{position_str}\n" + Traject::Util.exception_to_log_message(e) )
    logger.debug(package.context.source_record.to_s)

    @skipped_record_incrementer.getAndIncrement() # AtomicInteger, thread-safe increment.

    if fatal_exception? e
      logger.fatal ("SolrJ exception judged fatal, raising...")
      raise e
    end
  end
end

#batch_add_document_packages(current_batch) ⇒ Object

Takes array and batch adds it to solr -- array of UpdatePackage tuples of SolrInputDocument and context.

Catches error in batch add, logs, and re-tries docs individually

Is thread-safe, because SolrServer is thread-safe, and we aren't referencing any other shared state. Important that CALLER passes in a doc array that is not shared state, extracting it from shared state batched_queue in a mutex.



202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
# File 'lib/traject/solrj_writer.rb', line 202

def batch_add_document_packages(current_batch)
  begin
    a = current_batch.collect {|package| package.solr_document }
    solr_server.add( a )

    $stderr.write "%" if @debug_ascii_progress
  rescue Exception => e
    # Error in batch, none of the docs got added, let's try to re-add
    # em all individually, so those that CAN get added get added, and those
    # that can't get individually logged.
    logger.warn "Error encountered in batch solr add, will re-try documents individually, at a performance penalty...\n" + Traject::Util.exception_to_log_message(e)
    current_batch.each do |package|
      add_one_document_package(package)
    end
  end
end

#closeObject



277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
# File 'lib/traject/solrj_writer.rb', line 277

def close
  @thread_pool.raise_collected_exception!

  # Any leftovers in batch buffer? Send em to the threadpool too.
  if batched_queue.length > 0
    packages = []
    batched_queue.drain_to(packages)

    # we do it in the thread pool for consistency, and so
    # it goes to the end of the queue behind any outstanding
    # work in the pool.
    @thread_pool.maybe_in_thread_pool { batch_add_document_packages( packages ) }
  end

  # Wait for shutdown, and time it.
  logger.debug "SolrJWriter: Shutting down thread pool, waiting if needed..."
  elapsed = @thread_pool.shutdown_and_wait
  if elapsed > 60
    logger.warn "Waited #{elapsed} seconds for all SolrJWriter threads, you may want to increase solrj_writer.thread_pool (currently #{@settings["solrj_writer.thread_pool"]})"
  end
  logger.debug "SolrJWriter: Thread pool shutdown complete"
  logger.warn "SolrJWriter: #{skipped_record_count} skipped records" if skipped_record_count > 0

  # check again now that we've waited, there could still be some
  # that didn't show up before.
  @thread_pool.raise_collected_exception!

  if settings["solrj_writer.commit_on_close"].to_s == "true"
    logger.info "SolrJWriter: Sending commit to solr..."
    solr_server.commit
  end

  solr_server.shutdown
  @solr_server = nil
end

#ensure_solrj_loaded!Object

Loads solrj if not already loaded. By loading all jars found in settings["solrj.jar_dir"]



126
127
128
129
130
131
132
133
# File 'lib/traject/solrj_writer.rb', line 126

def ensure_solrj_loaded!
  unless defined?(HttpSolrServer) && defined?(SolrInputDocument)
    Traject::Util.require_solrj_jars(settings)
  end

  # And for now, SILENCE SolrJ logging
  org.apache.log4j.Logger.getRootLogger().addAppender(org.apache.log4j.varia.NullAppender.new)
end

#fatal_exception?(e) ⇒ Boolean

If an exception is encountered talking to Solr, is it one we should entirely give up on? SolrJ doesn't use a useful exception class hieararchy, we have to look into it's details and guess.

Returns:

  • (Boolean)


257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
# File 'lib/traject/solrj_writer.rb', line 257

def fatal_exception?(e)


  root_cause = e.respond_to?(:getRootCause) && e.getRootCause

  # Various kinds of inability to actually talk to the
  # server look like this:
  if root_cause.kind_of? java.io.IOException
    return true
  end

  # Consider Solr server returning HTTP 500 Internal Server Error to be fatal.
  # This can mean, for instance, that disk space is exhausted on solr server.
  if e.kind_of?(Java::OrgApacheSolrCommon::SolrException) && e.code == 500
    return true
  end

  return false
end

#hash_to_solr_document(hash) ⇒ Object



183
184
185
186
187
188
189
190
191
# File 'lib/traject/solrj_writer.rb', line 183

def hash_to_solr_document(hash)
  doc = SolrInputDocument.new
  hash.each_pair do |key, value_array|
    value_array.each do |value|
      doc.addField( key, value )
    end
  end
  return doc
end

#instantiate_solr_server!Object

Instantiates a solr server of class settings["solrj_writer.server_class_name"] or "HttpSolrServer" and initializes it with settings["solr.url"]



328
329
330
331
332
333
334
335
336
337
338
339
# File 'lib/traject/solrj_writer.rb', line 328

def instantiate_solr_server!
  server_class  = qualified_const_get( settings["solrj_writer.server_class_name"] || "HttpSolrServer" )
  server        = server_class.new( settings["solr.url"].to_s );

  if parser_name = settings["solrj_writer.parser_class_name"]
    #parser = org.apache.solr.client.solrj.impl.const_get(parser_name).new
    parser = Java::JavaClass.for_name("org.apache.solr.client.solrj.impl.#{parser_name}").ruby_class.new
    server.setParser( parser )
  end

  server
end

#loggerObject



250
251
252
# File 'lib/traject/solrj_writer.rb', line 250

def logger
  settings["logger"] ||=  Yell.new(STDERR, :level => "gt.fatal") # null logger
end

#put(context) ⇒ Object

Method IS thread-safe, can be called concurrently by multi-threads.

Why? If not using batched add, we just use the SolrServer, which is already thread safe itself.

If we are using batch add, we surround all access to our shared state batch queue in a mutex -- just a naive implementation. May be able to improve performance with more sophisticated java.util.concurrent data structure (blocking queue etc) I did try a java ArrayBlockingQueue or LinkedBlockingQueue instead of our own mutex -- I did not see consistently different performance. May want to change so doesn't use a mutex at all if multiple mapping threads aren't being used.

this class does not at present use any threads itself, all work will be done in the calling thread, including actual http transactions to solr via solrj SolrServer if using batches, then not every #put is a http transaction, but when it is, it's in the calling thread, synchronously.



152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
# File 'lib/traject/solrj_writer.rb', line 152

def put(context)
  @thread_pool.raise_collected_exception!

  # package the SolrInputDocument along with the context, so we have
  # the context for error reporting when we actually add.

  package = UpdatePackage.new(hash_to_solr_document(context.output_hash), context)

  if settings["solrj_writer.batch_size"].to_i > 1
    ready_batch = []

    batched_queue.add(package)
    if batched_queue.size >= settings["solrj_writer.batch_size"].to_i
      batched_queue.drain_to(ready_batch)
    end

    if ready_batch.length > 0
      if @debug_ascii_progress
        $stderr.write("^")
        if @thread_pool.queue && (@thread_pool.queue.size >= @thread_pool.queue_capacity)
          $stderr.write "!"
        end
      end

      @thread_pool.maybe_in_thread_pool { batch_add_document_packages(ready_batch) }
    end
  else # non-batched add, add one at a time.
    @thread_pool.maybe_in_thread_pool { add_one_document_package(package) }
  end
end

#settings_check!(settings) ⇒ Object



341
342
343
344
345
346
347
348
349
# File 'lib/traject/solrj_writer.rb', line 341

def settings_check!(settings)
  unless settings.has_key?("solr.url") && ! settings["solr.url"].nil?
    raise ArgumentError.new("SolrJWriter requires a 'solr.url' solr url in settings")
  end

  unless settings["solr.url"] =~ /^#{URI::regexp}$/
    raise ArgumentError.new("SolrJWriter requires a 'solr.url' setting that looks like a URL, not: `#{settings['solr.url']}`")
  end
end

#skipped_record_countObject

Return count of encountered skipped records. Most accurate to call it after #close, in which case it should include full count, even under async thread_pool.



316
317
318
# File 'lib/traject/solrj_writer.rb', line 316

def skipped_record_count
  @skipped_record_incrementer.get
end