Class: Traject::SequelWriter

Inherits:
Object
  • Object
show all
Defined in:
lib/traject/sequel_writer.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(argSettings) ⇒ SequelWriter

Returns a new instance of SequelWriter.



17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/traject/sequel_writer.rb', line 17

def initialize(argSettings)
  @settings = Traject::Indexer::Settings.new(argSettings)

  unless (!! @settings["sequel_writer.connection_string"]) ^ (!! @settings["sequel_writer.database"]) 
    raise ArgumentError, "Exactly one of either setting `sequel_writer.connection_string` or `sequel_writer.database` is required"
  end
  unless @settings["sequel_writer.table_name"]
    raise ArgumentError, "setting `sequel_writer.table_name` is required"
  end

  @disconnect_on_close = true
  @sequel_db = @settings["sequel_writer.database"]
  unless @sequel_db
    @sequel_db = Sequel.connect(@settings["sequel_writer.connection_string"])
    @disconnect_on_close = false
  end

  @db_table  = @sequel_db[  @settings["sequel_writer.table_name"].to_sym ]


  # Which keys to send to columns? Can be set explicitly with sequel_writer.columns,
  # or we'll use all non-PK columns introspected from the db schema. 
  @column_names      = @settings["sequel_writer.columns"]

  unless @column_names
    @column_names = @sequel_db.schema( @db_table.first_source_table ).find_all do |column, info|
      info[:primary_key] != true
    end.collect {|pair| pair.first}
  end
  @column_names = @column_names.collect {|c| c.to_sym}
  @column_names = @column_names.freeze

  
  # How many threads to use for the writer?
  # if our thread pool settings are 0, it'll just create a null threadpool that
  # executes in calling context. Default to 1, for waiting on DB I/O. 
  @thread_pool_size = (@settings["sequel_writer.thread_pool"] || 1).to_i

  @batch_size       = (@settings["sequel_writer.batch_size"] || 100).to_i

  @batched_queue         = Queue.new
  @thread_pool = Traject::ThreadPool.new(@thread_pool_size)

  @after_send_batch_callbacks = Array(@settings["sequel_writer.after_send_batch"] || [])

  @internal_delimiter = @settings["sequel_writer.internal_delimiter"] || ","
end

Instance Attribute Details

#db_tableObject (readonly)

Sequel table/relation object



15
16
17
# File 'lib/traject/sequel_writer.rb', line 15

def db_table
  @db_table
end

#sequel_dbObject (readonly)

Sequel db connection object



12
13
14
# File 'lib/traject/sequel_writer.rb', line 12

def sequel_db
  @sequel_db
end

Instance Method Details

#after_send_batch(&block) ⇒ Object



170
171
172
# File 'lib/traject/sequel_writer.rb', line 170

def after_send_batch(&block)
  @after_send_batch_callbacks << block
end

#closeObject



80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/traject/sequel_writer.rb', line 80

def close
  @thread_pool.raise_collected_exception!

  # Finish off whatever's left. Do it in the thread pool for
  # consistency, and to ensure expected order of operations, so
  # it goes to the end of the queue behind any other work.
  batch = Traject::Util.drain_queue(@batched_queue)
  @thread_pool.maybe_in_thread_pool(batch) {|batch_arg| send_batch(batch_arg) }
  

  # Wait for shutdown, and time it.
  logger.debug "#{self.class.name}: Shutting down thread pool, waiting if needed..."
  elapsed = @thread_pool.shutdown_and_wait
  if elapsed > 60
    logger.warn "Waited #{elapsed} seconds for all threads, you may want to increase sequel_writer.thread_pool (currently #{@settings["solr_writer.thread_pool"]})"
  end
  logger.debug "#{self.class.name}: Thread pool shutdown complete"

  # check again now that we've waited, there could still be some
  # that didn't show up before.
  @thread_pool.raise_collected_exception!

  @sequel_db.disconnect if @disconnect_on_close
end

#hash_to_array(column_names, hash) ⇒ Object



141
142
143
144
145
# File 'lib/traject/sequel_writer.rb', line 141

def hash_to_array(column_names, hash)
  column_names.collect do |c| 
    output_value_to_column_value(hash[c.to_s])
  end
end

#hashes_to_arrays(column_names, list_of_hashes) ⇒ Object

Turn an array of hashes into an array of arrays, with each array being a hashes values matching column_names, in that order



135
136
137
138
139
# File 'lib/traject/sequel_writer.rb', line 135

def hashes_to_arrays(column_names, list_of_hashes)
  list_of_hashes.collect do |h| 
    hash_to_array(column_names, h)
  end
end

#loggerObject

Get the logger from the settings, or default to an effectively null logger



66
67
68
# File 'lib/traject/sequel_writer.rb', line 66

def logger
  @settings["logger"] ||= Yell.new(STDERR, :level => "gt.fatal") # null logger
end

#output_value_to_column_value(v) ⇒ Object

Traject context.output_hash values are arrays. turn them into good column values, joining strings if needed.

Single values also accepted, even though not traject standard, they will be passed through unchanged.



152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
# File 'lib/traject/sequel_writer.rb', line 152

def output_value_to_column_value(v)
  if v.kind_of?(Array)
    if v.length == 0
      nil
    elsif v.length == 1
      v.first
    elsif v.first.kind_of?(String)
      v.join(@internal_delimiter)
    else
      # Not a string? Um, raise for now?
      raise ArgumentError.new("Traject::SequelWriter, multiple non-String values provided: #{v}")
    end
  else
    v
  end
end

#put(context) ⇒ Object



70
71
72
73
74
75
76
77
78
# File 'lib/traject/sequel_writer.rb', line 70

def put(context)
  @thread_pool.raise_collected_exception!

  @batched_queue << context
  if @batched_queue.size >= @batch_size
    batch = Traject::Util.drain_queue(@batched_queue)
    @thread_pool.maybe_in_thread_pool(batch) {|batch_arg| send_batch(batch_arg) }
  end
end

#send_batch(batch) ⇒ Object



105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# File 'lib/traject/sequel_writer.rb', line 105

def send_batch(batch)
  list_of_arrays = hashes_to_arrays(@column_names, batch.collect {|context| context.output_hash})

  begin
    db_table.import @column_names, list_of_arrays
  rescue Sequel::DatabaseError, Sequel::PoolTimeout => batch_exception
    # We rescue PoolTimeout too, because we're mysteriously getting those, they are maybe dropped DB connections?
    # Try them each one by one, mostly so we can get a reasonable error message with particular record. 
    logger.warn("SequelWriter: error (#{batch_exception}) inserting batch of #{list_of_arrays.count} starting from system_id #{batch.first.output_hash['system_id']}, retrying individually...")
    
    batch.each do |context|
      send_single(context)
    end
  end

  @after_send_batch_callbacks.each do |callback|
    callback.call(batch, self)
  end
end

#send_single(context) ⇒ Object



125
126
127
128
129
130
# File 'lib/traject/sequel_writer.rb', line 125

def send_single(context)      
  db_table.insert @column_names, hash_to_array(@column_names, context.output_hash)
rescue Sequel::DatabaseError => e
  logger.error("SequelWriter: Could not insert row: #{context.output_hash}: #{e}")
  raise e
end