Class: Traject::SolrJsonWriter

Inherits:
Object
  • Object
show all
Includes:
QualifiedConstGet
Defined in:
lib/traject/solr_json_writer.rb

Overview

Write to Solr using the JSON interface; only works for Solr >= 3.2

This should work under both MRI and JRuby, with JRuby getting much better performance due to the threading model.

Solr updates are by default sent with no commit params. This will definitely maximize your performance, and especially for bulk/batch indexing is recommended -- use Solr auto commit in your Solr configuration instead, possibly with commit_on_close setting here.

However, if you want the writer to send commitWithin=true, commit=true, softCommit=true, or any other URL parameters valid for Solr update handlers, you can configure this with solr_writer.solr_update_args setting. See: https://lucene.apache.org/solr/guide/7_0/near-real-time-searching.html#passing-commit-and-commitwithin-parameters-as-part-of-the-url Eg:

settings do
  provide "solr_writer.solr_update_args", { commitWithin: 1000 }
end

(That it's a hash makes it infeasible to set/override on command line, if this is annoying for you let us know)

solr_update_args will apply to batch and individual update requests, but not to commit sent if commit_on_close. You can also instead set solr_writer.solr_commit_args for that (or pass in an arg to #commit if calling manually)

Relevant settings

  • solr.url (optional if solr.update_url is set) The URL to the solr core to index into. (Can include embedded HTTP basic auth as eg http://user:pass@host/solr)

  • solr.update_url: The actual update url. If unset, we'll first see if "#solrsolr.url/update/json" exists, and if not use "#solrsolr.url/update". (Can include embedded HTTP basic auth as eg `http://user:pass@host/solr)

  • solr_writer.batch_size: How big a batch to send to solr. Default is 100. My tests indicate that this setting doesn't change overall index speed by a ton.

  • solr_writer.thread_pool: How many threads to use for the writer. Default is 1. Likely useful even under MRI since thread will be waiting on Solr for some time.

  • solr_writer.max_skipped: How many records skipped due to errors before we bail out with a fatal error? Set to -1 for unlimited skips. Default 0, raise and abort on a single record that could not be added to Solr.

  • solr_writer.skippable_exceptions: List of classes that will be rescued internal to SolrJsonWriter, and handled with max_skipped logic. Defaults to [HTTPClient::TimeoutError, SocketError, Errno::ECONNREFUSED, Traject::SolrJsonWriter::BadHttpResponse]

  • solr_writer.solr_update_args: A hash of query params to send to solr update url. Will be sent with every update request. Eg { softCommit: true } or { commitWithin: 1000 }. See also solr_writer.solr_commit_args

  • solr_writer.commit_on_close: Set to true (or "true") if you want to commit at the end of the indexing run. (Old "solrj_writer.commit_on_close" supported for backwards compat only.)

  • solr_writer.commit_solr_update_args: A hash of query params to send when committing. Will be used for automatic close_on_commit, as well as any manual calls to #commit. If set, must include => "true" or { "softCommit" => "true" } if you actually want commits to happen when SolrJsonWriter tries to commit! But can be used to switch to softCommits (hard commits default), or specify additional params like optimize etc.

  • solr_writer.http_timeout: Value in seconds, will be set on the httpclient as connect/receive/send timeout. No way to set them individually at present. Default nil, use HTTPClient defaults (60 for connect/recieve, 120 for send).

  • solr_writer.commit_timeout: If commit_on_close, how long to wait for Solr before giving up as a timeout (http client receive_timeout). Default 10 minutes. Solr can be slow at commits. Overrides solr_writer.timeout

  • solr_json_writer.http_client Mainly intended for testing, set your own HTTPClient or mock object to be used for HTTP.

  • solr_json_writer.use_packaged_certs: unlikely to be needed, set to true for legacy behavior, to use packaged HTTPClient gem ssl certs. https://github.com/nahi/httpclient/issues/445

Defined Under Namespace

Classes: BadHttpResponse, MaxSkippedRecordsExceeded

Constant Summary collapse

URI_REGEXP =
URI::Parser.new.make_regexp.freeze
DEFAULT_MAX_SKIPPED =
0
DEFAULT_BATCH_SIZE =
100

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from QualifiedConstGet

#qualified_const_get

Constructor Details

#initialize(argSettings) ⇒ SolrJsonWriter

Returns a new instance of SolrJsonWriter.



106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
# File 'lib/traject/solr_json_writer.rb', line 106

def initialize(argSettings)
  @settings = Traject::Indexer::Settings.new(argSettings)


  # Set max errors
  @max_skipped = (@settings['solr_writer.max_skipped'] || DEFAULT_MAX_SKIPPED).to_i
  if @max_skipped < 0
    @max_skipped = nil
  end


  # Figure out where to send updates, and if with basic auth
  @solr_update_url, basic_auth_user, basic_auth_password = self.determine_solr_update_url

  @http_client = if @settings["solr_json_writer.http_client"]
    @settings["solr_json_writer.http_client"]
  else
    client = HTTPClient.new

    # By default we'll use teh host OS SSL certs, but you can use
    # setting solr_json_writer.use_packaged_certs to true or "true"
    # to go back to previous behavior if you have a perverse reason to.
    # https://github.com/nahi/httpclient/issues/445
    unless @settings["solr_json_writer.use_packaged_certs"].to_s == "true"
      client.ssl_config.set_default_paths
    end

    if @settings["solr_writer.http_timeout"]
      client.connect_timeout = client.receive_timeout = client.send_timeout = @settings["solr_writer.http_timeout"]
    end

    if basic_auth_user || basic_auth_password
      client.set_auth(@solr_update_url, basic_auth_user, basic_auth_password)
    end

    client
  end

  @batch_size = (settings["solr_writer.batch_size"] || DEFAULT_BATCH_SIZE).to_i
  @batch_size = 1 if @batch_size < 1

  # Store error count in an AtomicInteger, so multi threads can increment
  # it safely, if we're threaded.
  @skipped_record_incrementer = Concurrent::AtomicFixnum.new(0)


  # How many threads to use for the writer?
  # if our thread pool settings are 0, it'll just create a null threadpool that
  # executes in calling context.
  @thread_pool_size = (@settings["solr_writer.thread_pool"] || 1).to_i

  @batched_queue         = Queue.new
  @thread_pool = Traject::ThreadPool.new(@thread_pool_size)

  # old setting solrj_writer supported for backwards compat, as we make
  # this the new default writer.
  @commit_on_close = (settings["solr_writer.commit_on_close"] || settings["solrj_writer.commit_on_close"]).to_s == "true"


  @solr_update_args = settings["solr_writer.solr_update_args"]
  @commit_solr_update_args = settings["solr_writer.commit_solr_update_args"]

  logger.info("   #{self.class.name} writing to '#{@solr_update_url}' #{"(with HTTP basic auth)" if basic_auth_user || basic_auth_password}in batches of #{@batch_size} with #{@thread_pool_size} bg threads")
end

Instance Attribute Details

#batched_queueObject (readonly)

A queue to hold documents before sending to solr



104
105
106
# File 'lib/traject/solr_json_writer.rb', line 104

def batched_queue
  @batched_queue
end

#settingsObject (readonly)

The passed-in settings



101
102
103
# File 'lib/traject/solr_json_writer.rb', line 101

def settings
  @settings
end

#thread_pool_sizeObject (readonly)

The passed-in settings



101
102
103
# File 'lib/traject/solr_json_writer.rb', line 101

def thread_pool_size
  @thread_pool_size
end

Instance Method Details

#check_solr_update_url(url) ⇒ Object

If we've got a solr.update_url, make sure it's ok



412
413
414
415
416
417
# File 'lib/traject/solr_json_writer.rb', line 412

def check_solr_update_url(url)
  unless /^#{URI_REGEXP}$/.match(url)
    raise ArgumentError.new("#{self.class.name} setting `solr.update_url` doesn't look like a URL: `#{url}`")
  end
  url
end

#closeObject

On close, we need to (a) raise any exceptions we might have, (b) send off the last (possibly empty) batch, and (c) commit if instructed to do so via the solr_writer.commit_on_close setting.



311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
# File 'lib/traject/solr_json_writer.rb', line 311

def close
  @thread_pool.raise_collected_exception!

  # Finish off whatever's left. Do it in the thread pool for
  # consistency, and to ensure expected order of operations, so
  # it goes to the end of the queue behind any other work.
  batch = Traject::Util.drain_queue(@batched_queue)
  if batch.length > 0
    @thread_pool.maybe_in_thread_pool { send_batch(batch) }
  end

  if @thread_pool_size && @thread_pool_size > 0
    # Wait for shutdown, and time it.
    logger.debug "#{self.class.name}: Shutting down thread pool, waiting if needed..."
    elapsed = @thread_pool.shutdown_and_wait
    if elapsed > 60
      logger.warn "Waited #{elapsed} seconds for all threads, you may want to increase solr_writer.thread_pool (currently #{@settings["solr_writer.thread_pool"]})"
    end
    logger.debug "#{self.class.name}: Thread pool shutdown complete"
    logger.warn "#{self.class.name}: #{skipped_record_count} skipped records" if skipped_record_count > 0
  end

  # check again now that we've waited, there could still be some
  # that didn't show up before.
  @thread_pool.raise_collected_exception!

  # Commit if we're supposed to
  if @commit_on_close
    commit
  end
end

#commit(query_params = nil) ⇒ Object

Send a commit

Called automatially by close_on_commit setting, but also can be called manually.

If settings solr_writer.commit_solr_update_args is set, will be used by default. That setting needs { commit: true } or {softCommit: true} if you want it to actually do a commit!

Optional query_params argument is the actual args to send, you must be sure to make it include "commit: true" or "softCommit: true" for it to actually commit! But you may want to include other params too, like optimize etc. query_param argument replaces setting solr_writer.commit_solr_update_args, they are not merged.

Examples:

@writer.commit

@writer.commit(softCommit: true)

@writer.commit(commit: true, optimize: true, waitFlush: false)

Parameters:

  • query_params (Hash) (defaults to: nil)

    optional query params to send to solr update. Default => "true"



362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
# File 'lib/traject/solr_json_writer.rb', line 362

def commit(query_params = nil)
  query_params ||= @commit_solr_update_args || {"commit" => "true"}
  logger.info "#{self.class.name} sending commit to solr at url #{@solr_update_url}..."

  original_timeout = @http_client.receive_timeout

  @http_client.receive_timeout = (settings["commit_timeout"] || (10 * 60)).to_i

  resp = @http_client.get(solr_update_url_with_query(query_params))
  unless resp.status == 200
    raise RuntimeError.new("Could not commit to Solr: #{resp.status} #{resp.body}")
  end

  @http_client.receive_timeout = original_timeout
end

#delete(id) ⇒ Object

Very beginning of a delete implementation. POSTs a delete request to solr for id in arg (value of Solr UniqueID field, usually id field).

Right now, does it inline and immediately, no use of background threads or batching. This could change.

Right now, if unsuccesful for any reason, will raise immediately out of here. Could raise any of the skippable_exceptions (timeouts, network errors), an exception will be raised right out of here.

Will use solr_writer.solr_update_args settings.

There is no built-in way to direct a record to be deleted from an indexing config file at the moment, this is just a loose method on the writer.



285
286
287
288
289
290
291
292
293
# File 'lib/traject/solr_json_writer.rb', line 285

def delete(id)
  logger.debug("#{self.class.name}: Sending delete to Solr for #{id}")

  json_package = {delete: id}
  resp = @http_client.post solr_update_url_with_query(@solr_update_args), JSON.generate(json_package), "Content-type" => "application/json"
  if resp.status != 200
    raise RuntimeError.new("Could not delete #{id.inspect}, http response #{resp.status}: #{resp.body}")
  end
end

#delete_all!Object

Send a delete all query.

This method takes no params and will not automatically commit the deletes.

Examples:

@writer.delete_all!



299
300
301
# File 'lib/traject/solr_json_writer.rb', line 299

def delete_all!
  delete(query: "*:*")
end

#derive_solr_update_url_from_solr_url(url) ⇒ Object



419
420
421
422
423
424
425
426
427
428
429
430
431
432
# File 'lib/traject/solr_json_writer.rb', line 419

def derive_solr_update_url_from_solr_url(url)
  # Nil? Then we bail
  if url.nil?
    raise ArgumentError.new("#{self.class.name}: Neither solr.update_url nor solr.url set; need at least one")
  end

  # Not a URL? Bail
  unless  /^#{URI_REGEXP}$/.match(url)
    raise ArgumentError.new("#{self.class.name} setting `solr.url` doesn't look like a URL: `#{url}`")
  end

  # Assume the /update/json handler
  return [url.chomp('/'), 'update', 'json'].join('/')
end

#determine_solr_update_urlObject

Relatively complex logic to determine if we have a valid URL and what it is, and if we have basic_auth info

Empties out user and password embedded in URI returned, to help avoid logging it.



393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
# File 'lib/traject/solr_json_writer.rb', line 393

def determine_solr_update_url
  url = if settings['solr.update_url']
    check_solr_update_url(settings['solr.update_url'])
  else
    derive_solr_update_url_from_solr_url(settings['solr.url'])
  end

  parsed_uri                            = URI.parse(url)
  user_from_uri, password_from_uri      = parsed_uri.user, parsed_uri.password
  parsed_uri.user, parsed_uri.password  = nil, nil

  basic_auth_user     = @settings["solr_writer.basic_auth_user"] || user_from_uri
  basic_auth_password = @settings["solr_writer.basic_auth_password"] || password_from_uri

  return [parsed_uri.to_s, basic_auth_user, basic_auth_password]
end

#flushObject

Not part of standard writer API.

If we are batching adds, and have some not-yet-written ones queued up -- flush em all to solr.

This should be thread-safe to call, but the write does take place in the caller's thread, no threading is done for you here, regardless of setting of solr_writer.thread_pool



191
192
193
# File 'lib/traject/solr_json_writer.rb', line 191

def flush
  send_batch( Traject::Util.drain_queue(@batched_queue) )
end

#loggerObject

Get the logger from the settings, or default to an effectively null logger



304
305
306
# File 'lib/traject/solr_json_writer.rb', line 304

def logger
  settings["logger"] ||= Yell.new(STDERR, :level => "gt.fatal") # null logger
end

#put(context) ⇒ Object

Add a single context to the queue, ready to be sent to solr



173
174
175
176
177
178
179
180
181
# File 'lib/traject/solr_json_writer.rb', line 173

def put(context)
  @thread_pool.raise_collected_exception!

  @batched_queue << context
  if @batched_queue.size >= @batch_size
    batch = Traject::Util.drain_queue(@batched_queue)
    @thread_pool.maybe_in_thread_pool(batch) {|batch_arg| send_batch(batch_arg) }
  end
end

#send_batch(batch) ⇒ Object

Send the given batch of contexts. If something goes wrong, send them one at a time.

Parameters:



208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
# File 'lib/traject/solr_json_writer.rb', line 208

def send_batch(batch)
  return if batch.empty?

  logger.debug("#{self.class.name}: sending batch of #{batch.size} to Solr")

  json_package = JSON.generate(batch.map { |c| c.output_hash })

  begin
    resp = @http_client.post solr_update_url_with_query(@solr_update_args), json_package, "Content-type" => "application/json"
  rescue StandardError => exception
  end

  if exception || resp.status != 200
    error_message = exception ?
      Traject::Util.exception_to_log_message(exception) :
      "Solr response: #{resp.status}: #{resp.body}"

    logger.error "Error in Solr batch add. Will retry documents individually at performance penalty: #{error_message}"

    batch.each do |c|
      send_single(c)
    end
  end
end

#send_single(c) ⇒ Object

Send a single context to Solr, logging an error if need be

Parameters:



236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
# File 'lib/traject/solr_json_writer.rb', line 236

def send_single(c)
  logger.debug("#{self.class.name}: sending single record to Solr: #{c.output_hash}")

  json_package = JSON.generate([c.output_hash])
  begin
    post_url = solr_update_url_with_query(@solr_update_args)
    resp = @http_client.post post_url, json_package, "Content-type" => "application/json"

    unless resp.status == 200
      raise BadHttpResponse.new("Unexpected HTTP response status #{resp.status} from POST #{post_url}", resp)
    end

    # Catch Timeouts and network errors -- as well as non-200 http responses --
    # as skipped records, but otherwise allow unexpected errors to propagate up.
  rescue *skippable_exceptions => exception
    msg = if exception.kind_of?(BadHttpResponse)
      "Solr error response: #{exception.response.status}: #{exception.response.body}"
    else
      Traject::Util.exception_to_log_message(exception)
    end

    logger.error "Could not add record #{c.record_inspect}: #{msg}"
    logger.debug("\t" + exception.backtrace.join("\n\t")) if exception
    logger.debug(c.source_record.to_s) if c.source_record

    @skipped_record_incrementer.increment
    if @max_skipped and skipped_record_count > @max_skipped
      # re-raising in rescue means the last encountered error will be available as #cause
      # on raised exception, a feature in ruby 2.1+.
      raise MaxSkippedRecordsExceeded.new("#{self.class.name}: Exceeded maximum number of skipped records (#{@max_skipped}): aborting: #{exception.message}")
    end
  end
end

#skipped_record_countObject

Return count of encountered skipped records. Most accurate to call it after #close, in which case it should include full count, even under async thread_pool.



382
383
384
# File 'lib/traject/solr_json_writer.rb', line 382

def skipped_record_count
  @skipped_record_incrementer.value
end

#solr_update_url_with_query(query_params) ⇒ Object

configured update url, with either settings @solr_update_args or passed in query_params added to it



197
198
199
200
201
202
203
# File 'lib/traject/solr_json_writer.rb', line 197

def solr_update_url_with_query(query_params)
  if query_params
    @solr_update_url + '?' + URI.encode_www_form(query_params)
  else
    @solr_update_url
  end
end