Class: Solr::CursorStream

Inherits:
Object
  • Object
show all
Includes:
Enumerable
Defined in:
lib/solr/cursorstream.rb,
lib/solr/cursorstream/version.rb

Overview

Fetch results from a solr filter query via solr’s cursor streaming. solr.apache.org/guide/8_6/pagination-of-results.html#fetching-a-large-number-of-sorted-results-cursors

Note that accessors for things like query, filters, etc. are made available for ease of configuration only. Changing anything in the middle of a job will screw up the cursors and leave things undetermined. Just make another CursorStream object.

Defined Under Namespace

Classes: Error, Response

Constant Summary collapse

VERSION =
"0.2.0"

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(url:, handler: "select", query: "*:*", filters: ["*:*"], sort: "id asc", batch_size: 100, fields: [], logger: nil, adapter: :httpx) {|_self| ... } ⇒ CursorStream

Returns a new instance of CursorStream.

Parameters:

  • url (String)

    URL to the solr core (e.g., my.machine.com/solr/mycore)

  • handler (String) (defaults to: "select")

    The specific handler to target.

  • filters (Array<String>) (defaults to: ["*:*"])

    Array of filter queries to apply.

  • sort (String) (defaults to: "id asc")

    A valid solr sort string. MUST include the unique field (as per solr docs)

  • batch_size (Integer) (defaults to: 100)

    How many results to fetch at a time (for efficiency)

  • fields (Array<String>) (defaults to: [])

    The solr fields to return.

  • A (Logger, #info)

    logger or logger-like object. When set to ‘nil` will not do any logging.

  • adapter (Symbol) (defaults to: :httpx)

    A valid Faraday adapter. If not using the default httpx, it is up to the programmer to do whatever ‘require` calls are necessary.

Yields:

  • (_self)

Yield Parameters:



31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/solr/cursorstream.rb', line 31

def initialize(url:, handler: "select", query: "*:*", filters: ["*:*"], sort: "id asc", batch_size: 100, fields: [], logger: nil, adapter: :httpx)
  @url = url.gsub(/\/\Z/, "")
  @query = query
  @handler = handler
  @filters = filters
  @sort = sort
  @batch_size = batch_size
  @fields = fields
  @logger = logger
  @adapter = adapter

  @current_cursor = "*"
  yield self if block_given?
end

Instance Attribute Details

#batch_sizeObject

Returns the value of attribute batch_size.



20
21
22
# File 'lib/solr/cursorstream.rb', line 20

def batch_size
  @batch_size
end

#fieldsObject

Returns the value of attribute fields.



20
21
22
# File 'lib/solr/cursorstream.rb', line 20

def fields
  @fields
end

#filtersObject

Returns the value of attribute filters.



20
21
22
# File 'lib/solr/cursorstream.rb', line 20

def filters
  @filters
end

#handlerObject

Returns the value of attribute handler.



20
21
22
# File 'lib/solr/cursorstream.rb', line 20

def handler
  @handler
end

#loggerObject

Returns the value of attribute logger.



20
21
22
# File 'lib/solr/cursorstream.rb', line 20

def logger
  @logger
end

#queryObject

Returns the value of attribute query.



20
21
22
# File 'lib/solr/cursorstream.rb', line 20

def query
  @query
end

#sortObject

Returns the value of attribute sort.



20
21
22
# File 'lib/solr/cursorstream.rb', line 20

def sort
  @sort
end

#urlObject

Returns the value of attribute url.



20
21
22
# File 'lib/solr/cursorstream.rb', line 20

def url
  @url
end

Class Method Details

.connection(adapter: :httpx) ⇒ Faraday::Connection

Build up a Faraday connection necessary adapter already.

Parameters:

  • adapter (Symbol) (defaults to: :httpx)

    Which faraday adapter to use. If not :httpx, you must have loaded the

Returns:

  • (Faraday::Connection)

    A faraday connection object.



67
68
69
70
71
72
73
74
75
76
# File 'lib/solr/cursorstream.rb', line 67

def self.connection(adapter: :httpx)
  require "httpx/adapters/faraday" if adapter == :httpx
  Faraday.new(request: {params_encoder: Faraday::FlatParamsEncoder}) do |builder|
    builder.use Faraday::Response::RaiseError
    builder.request :url_encoded
    builder.request :retry
    builder.response :json
    builder.adapter @adapter
  end
end

Instance Method Details

#connection(adapter: @adapter) ⇒ Object

See Also:



79
80
81
82
# File 'lib/solr/cursorstream.rb', line 79

def connection(adapter: @adapter)
  return @connection if @connection
  @connection = self.class.connection(adapter: @adapter)
end

#default_paramsHash

Returns Default solr params derived from instance variables.

Returns:

  • (Hash)

    Default solr params derived from instance variables



97
98
99
100
101
102
# File 'lib/solr/cursorstream.rb', line 97

def default_params
  field_list = Array(fields).join(",")
  p = {q: @query, wt: :json, rows: batch_size, sort: @sort, fq: filters, fl: field_list}
  p.reject { |_k, v| [nil, "", []].include?(v) }
  p
end

#eachObject

Iterate through the documents in the stream. Behind the scenes, these will be fetched in batches of ‘batch_size` for efficiency.

Yield Returns:

  • (Hash)

    A single solr document from the stream



54
55
56
57
58
59
60
61
# File 'lib/solr/cursorstream.rb', line 54

def each
  return enum_for(:each) unless block_given?
  verify_we_have_everything!
  while solr_has_more?
    cursor_response = get_page
    cursor_response.docs.each { |d| yield d }
  end
end

#get_pageCursorResponse

Get a single “page” (‘batch_size` documents) from solr. Feeds into #each

Returns:

  • (CursorResponse)


87
88
89
90
91
92
93
94
# File 'lib/solr/cursorstream.rb', line 87

def get_page
  params = {cursorMark: @current_cursor}.merge default_params
  r = connection.get(solr_url, params)
  resp = Response.new(r)
  @last_cursor = @current_cursor
  @current_cursor = resp.cursor
  resp
end

#http_request_retry_blockObject

Returns Lambda that runs every time the connection needs to retry due to http error.

Returns:

  • Lambda that runs every time the connection needs to retry due to http error



120
121
122
123
124
# File 'lib/solr/cursorstream.rb', line 120

def http_request_retry_block
  ->(env:, options:, retries_remaining:, exception:, will_retry_in:) do
    # TODO: log that a retry happened
  end
end

#solr_has_more?Boolean

Determine if solr has another page of results

Returns:

  • (Boolean)


114
115
116
# File 'lib/solr/cursorstream.rb', line 114

def solr_has_more?
  @last_cursor != @current_cursor
end

#solr_urlObject

Returns String solr url build from the passed url and the handler.

Returns:

  • String solr url build from the passed url and the handler



47
48
49
# File 'lib/solr/cursorstream.rb', line 47

def solr_url
  url + "/" + handler
end

#verify_we_have_everything!Object

Make sure we have everything we need for a successful stream

Raises:



106
107
108
109
# File 'lib/solr/cursorstream.rb', line 106

def verify_we_have_everything!
  missing = {handler: @handler, filters: @filters, batch_size: @batch_size}.select { |_k, v| v.nil? }.keys
  raise Error.new("Solr::CursorStreamer missing value for #{missing.join(", ")}") unless missing.empty?
end