Class: Traject::SequelWriter
- Inherits:
-
Object
- Object
- Traject::SequelWriter
- Defined in:
- lib/traject/sequel_writer.rb
Instance Attribute Summary collapse
-
#db_table ⇒ Object
readonly
Sequel table/relation object.
-
#sequel_db ⇒ Object
readonly
Sequel db connection object.
Instance Method Summary collapse
- #after_send_batch(&block) ⇒ Object
- #close ⇒ Object
- #hash_to_array(column_names, hash) ⇒ Object
-
#hashes_to_arrays(column_names, list_of_hashes) ⇒ Object
Turn an array of hashes into an array of arrays, with each array being a hashes values matching column_names, in that order.
-
#initialize(argSettings) ⇒ SequelWriter
constructor
A new instance of SequelWriter.
-
#logger ⇒ Object
Get the logger from the settings, or default to an effectively null logger.
-
#output_value_to_column_value(v) ⇒ Object
Traject context.output_hash values are arrays.
- #put(context) ⇒ Object
- #send_batch(batch) ⇒ Object
- #send_single(context) ⇒ Object
Constructor Details
#initialize(argSettings) ⇒ SequelWriter
Returns a new instance of SequelWriter.
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/traject/sequel_writer.rb', line 17 def initialize(argSettings) @settings = Traject::Indexer::Settings.new(argSettings) unless (!! @settings["sequel_writer.connection_string"]) ^ (!! @settings["sequel_writer.database"]) raise ArgumentError, "Exactly one of either setting `sequel_writer.connection_string` or `sequel_writer.database` is required" end unless @settings["sequel_writer.table_name"] raise ArgumentError, "setting `sequel_writer.table_name` is required" end @disconnect_on_close = true @sequel_db = @settings["sequel_writer.database"] unless @sequel_db @sequel_db = Sequel.connect(@settings["sequel_writer.connection_string"]) @disconnect_on_close = false end @db_table = @sequel_db[ @settings["sequel_writer.table_name"].to_sym ] # Which keys to send to columns? Can be set explicitly with sequel_writer.columns, # or we'll use all non-PK columns introspected from the db schema. @column_names = @settings["sequel_writer.columns"] unless @column_names @column_names = @sequel_db.schema( @db_table.first_source_table ).find_all do |column, info| info[:primary_key] != true end.collect {|pair| pair.first} end @column_names = @column_names.collect {|c| c.to_sym} @column_names = @column_names.freeze # How many threads to use for the writer? # if our thread pool settings are 0, it'll just create a null threadpool that # executes in calling context. Default to 1, for waiting on DB I/O. @thread_pool_size = (@settings["sequel_writer.thread_pool"] || 1).to_i @batch_size = (@settings["sequel_writer.batch_size"] || 100).to_i @batched_queue = Queue.new @thread_pool = Traject::ThreadPool.new(@thread_pool_size) @after_send_batch_callbacks = Array(@settings["sequel_writer.after_send_batch"] || []) @internal_delimiter = @settings["sequel_writer.internal_delimiter"] || "," end |
Instance Attribute Details
#db_table ⇒ Object (readonly)
Sequel table/relation object
15 16 17 |
# File 'lib/traject/sequel_writer.rb', line 15 def db_table @db_table end |
#sequel_db ⇒ Object (readonly)
Sequel db connection object
12 13 14 |
# File 'lib/traject/sequel_writer.rb', line 12 def sequel_db @sequel_db end |
Instance Method Details
#after_send_batch(&block) ⇒ Object
170 171 172 |
# File 'lib/traject/sequel_writer.rb', line 170 def after_send_batch(&block) @after_send_batch_callbacks << block end |
#close ⇒ Object
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/traject/sequel_writer.rb', line 80 def close @thread_pool.raise_collected_exception! # Finish off whatever's left. Do it in the thread pool for # consistency, and to ensure expected order of operations, so # it goes to the end of the queue behind any other work. batch = Traject::Util.drain_queue(@batched_queue) @thread_pool.maybe_in_thread_pool(batch) {|batch_arg| send_batch(batch_arg) } # Wait for shutdown, and time it. logger.debug "#{self.class.name}: Shutting down thread pool, waiting if needed..." elapsed = @thread_pool.shutdown_and_wait if elapsed > 60 logger.warn "Waited #{elapsed} seconds for all threads, you may want to increase sequel_writer.thread_pool (currently #{@settings["solr_writer.thread_pool"]})" end logger.debug "#{self.class.name}: Thread pool shutdown complete" # check again now that we've waited, there could still be some # that didn't show up before. @thread_pool.raise_collected_exception! @sequel_db.disconnect if @disconnect_on_close end |
#hash_to_array(column_names, hash) ⇒ Object
141 142 143 144 145 |
# File 'lib/traject/sequel_writer.rb', line 141 def hash_to_array(column_names, hash) column_names.collect do |c| output_value_to_column_value(hash[c.to_s]) end end |
#hashes_to_arrays(column_names, list_of_hashes) ⇒ Object
Turn an array of hashes into an array of arrays, with each array being a hashes values matching column_names, in that order
135 136 137 138 139 |
# File 'lib/traject/sequel_writer.rb', line 135 def hashes_to_arrays(column_names, list_of_hashes) list_of_hashes.collect do |h| hash_to_array(column_names, h) end end |
#logger ⇒ Object
Get the logger from the settings, or default to an effectively null logger
66 67 68 |
# File 'lib/traject/sequel_writer.rb', line 66 def logger @settings["logger"] ||= Yell.new(STDERR, :level => "gt.fatal") # null logger end |
#output_value_to_column_value(v) ⇒ Object
Traject context.output_hash values are arrays. turn them into good column values, joining strings if needed.
Single values also accepted, even though not traject standard, they will be passed through unchanged.
152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 |
# File 'lib/traject/sequel_writer.rb', line 152 def output_value_to_column_value(v) if v.kind_of?(Array) if v.length == 0 nil elsif v.length == 1 v.first elsif v.first.kind_of?(String) v.join(@internal_delimiter) else # Not a string? Um, raise for now? raise ArgumentError.new("Traject::SequelWriter, multiple non-String values provided: #{v}") end else v end end |
#put(context) ⇒ Object
70 71 72 73 74 75 76 77 78 |
# File 'lib/traject/sequel_writer.rb', line 70 def put(context) @thread_pool.raise_collected_exception! @batched_queue << context if @batched_queue.size >= @batch_size batch = Traject::Util.drain_queue(@batched_queue) @thread_pool.maybe_in_thread_pool(batch) {|batch_arg| send_batch(batch_arg) } end end |
#send_batch(batch) ⇒ Object
105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 |
# File 'lib/traject/sequel_writer.rb', line 105 def send_batch(batch) list_of_arrays = hashes_to_arrays(@column_names, batch.collect {|context| context.output_hash}) begin db_table.import @column_names, list_of_arrays rescue Sequel::DatabaseError, Sequel::PoolTimeout => batch_exception # We rescue PoolTimeout too, because we're mysteriously getting those, they are maybe dropped DB connections? # Try them each one by one, mostly so we can get a reasonable error message with particular record. logger.warn("SequelWriter: error (#{batch_exception}) inserting batch of #{list_of_arrays.count} starting from system_id #{batch.first.output_hash['system_id']}, retrying individually...") batch.each do |context| send_single(context) end end @after_send_batch_callbacks.each do |callback| callback.call(batch, self) end end |
#send_single(context) ⇒ Object
125 126 127 128 129 130 |
# File 'lib/traject/sequel_writer.rb', line 125 def send_single(context) db_table.insert @column_names, hash_to_array(@column_names, context.output_hash) rescue Sequel::DatabaseError => e logger.error("SequelWriter: Could not insert row: #{context.output_hash}: #{e}") raise e end |