Class: BulkInsert::Worker
- Inherits:
-
Object
- Object
- BulkInsert::Worker
- Defined in:
- lib/bulk_insert/worker.rb
Instance Attribute Summary collapse
-
#adapter_name ⇒ Object
Returns the value of attribute adapter_name.
-
#after_save_callback ⇒ Object
Returns the value of attribute after_save_callback.
-
#before_save_callback ⇒ Object
Returns the value of attribute before_save_callback.
-
#connection ⇒ Object
readonly
Returns the value of attribute connection.
-
#ignore ⇒ Object
readonly
Returns the value of attribute ignore.
-
#result_sets ⇒ Object
readonly
Returns the value of attribute result_sets.
-
#set_size ⇒ Object
Returns the value of attribute set_size.
-
#update_duplicates ⇒ Object
readonly
Returns the value of attribute update_duplicates.
Instance Method Summary collapse
- #add(values) ⇒ Object
- #add_all(rows) ⇒ Object
- #after_save(&block) ⇒ Object
- #before_save(&block) ⇒ Object
- #compose_insert_query ⇒ Object
- #execute_query ⇒ Object
-
#initialize(connection, table_name, primary_key, column_names, set_size = 500, ignore = false, update_duplicates = false, return_primary_keys = false) ⇒ Worker
constructor
A new instance of Worker.
- #insert_sql_statement ⇒ Object
- #pending? ⇒ Boolean
- #pending_count ⇒ Object
- #save! ⇒ Object
Constructor Details
#initialize(connection, table_name, primary_key, column_names, set_size = 500, ignore = false, update_duplicates = false, return_primary_keys = false) ⇒ Worker
Returns a new instance of Worker.
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
# File 'lib/bulk_insert/worker.rb', line 12 def initialize(connection, table_name, primary_key, column_names, set_size=500, ignore=false, update_duplicates=false, return_primary_keys=false) @statement_adapter = StatementAdapters.adapter_for(connection) @connection = connection @set_size = set_size @adapter_name = connection.adapter_name # INSERT IGNORE only fails inserts with duplicate keys or unallowed nulls not the whole set of inserts @ignore = ignore @update_duplicates = update_duplicates @return_primary_keys = return_primary_keys columns = connection.columns(table_name) column_map = columns.inject({}) { |h, c| h.update(c.name => c) } @primary_key = primary_key @columns = column_names.map { |name| column_map[name.to_s] } @table_name = connection.quote_table_name(table_name) @column_names = column_names.map { |name| connection.quote_column_name(name) }.join(",") @before_save_callback = nil @after_save_callback = nil @result_sets = [] @set = [] end |
Instance Attribute Details
#adapter_name ⇒ Object
Returns the value of attribute adapter_name.
9 10 11 |
# File 'lib/bulk_insert/worker.rb', line 9 def adapter_name @adapter_name end |
#after_save_callback ⇒ Object
Returns the value of attribute after_save_callback.
8 9 10 |
# File 'lib/bulk_insert/worker.rb', line 8 def after_save_callback @after_save_callback end |
#before_save_callback ⇒ Object
Returns the value of attribute before_save_callback.
7 8 9 |
# File 'lib/bulk_insert/worker.rb', line 7 def before_save_callback @before_save_callback end |
#connection ⇒ Object (readonly)
Returns the value of attribute connection.
5 6 7 |
# File 'lib/bulk_insert/worker.rb', line 5 def connection @connection end |
#ignore ⇒ Object (readonly)
Returns the value of attribute ignore.
10 11 12 |
# File 'lib/bulk_insert/worker.rb', line 10 def ignore @ignore end |
#result_sets ⇒ Object (readonly)
Returns the value of attribute result_sets.
10 11 12 |
# File 'lib/bulk_insert/worker.rb', line 10 def result_sets @result_sets end |
#set_size ⇒ Object
Returns the value of attribute set_size.
6 7 8 |
# File 'lib/bulk_insert/worker.rb', line 6 def set_size @set_size end |
#update_duplicates ⇒ Object (readonly)
Returns the value of attribute update_duplicates.
10 11 12 |
# File 'lib/bulk_insert/worker.rb', line 10 def update_duplicates @update_duplicates end |
Instance Method Details
#add(values) ⇒ Object
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/bulk_insert/worker.rb', line 47 def add(values) save! if @set.length >= set_size values = values.with_indifferent_access if values.is_a?(Hash) mapped = @columns.map.with_index do |column, index| value_exists = values.is_a?(Hash) ? values.key?(column.name) : (index < values.length) if !value_exists if column.default.present? column.default elsif column.name == "created_at" || column.name == "updated_at" :__timestamp_placeholder else nil end else values.is_a?(Hash) ? values[column.name] : values[index] end end @set.push(mapped) self end |
#add_all(rows) ⇒ Object
70 71 72 73 |
# File 'lib/bulk_insert/worker.rb', line 70 def add_all(rows) rows.each { |row| add(row) } self end |
#after_save(&block) ⇒ Object
79 80 81 |
# File 'lib/bulk_insert/worker.rb', line 79 def after_save(&block) @after_save_callback = block end |
#before_save(&block) ⇒ Object
75 76 77 |
# File 'lib/bulk_insert/worker.rb', line 75 def before_save(&block) @before_save_callback = block end |
#compose_insert_query ⇒ Object
112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 |
# File 'lib/bulk_insert/worker.rb', line 112 def compose_insert_query sql = insert_sql_statement @now = Time.now rows = [] @set.each do |row| values = [] @columns.zip(row) do |column, value| value = @now if value == :__timestamp_placeholder if ActiveRecord::VERSION::STRING >= "5.0.0" if column type = @connection.lookup_cast_type_from_column(column) value = type.serialize(value) end values << @connection.quote(value) else values << @connection.quote(value, column) end end rows << "(#{values.join(',')})" end if !rows.empty? sql << rows.join(",") sql << @statement_adapter.on_conflict_statement(@columns, ignore, update_duplicates) sql << @statement_adapter.primary_key_return_statement(@primary_key) if @return_primary_keys sql else false end end |
#execute_query ⇒ Object
94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 |
# File 'lib/bulk_insert/worker.rb', line 94 def execute_query if query = compose_insert_query # Return primary key support broke mysql compatibility # with rails < 5 mysql adapter. (see issue #41) if ActiveRecord::VERSION::STRING < "5.0.0" && @statement_adapter.is_a?(StatementAdapters::MySQLAdapter) # raise an exception for unsupported return_primary_keys raise ArgumentError.new("BulkInsert does not support @return_primary_keys for mysql and rails < 5") if @return_primary_keys # restore v1.6 query execution @connection.execute(query) else result_set = @connection.exec_query(query) @result_sets.push(result_set) if @return_primary_keys end end end |
#insert_sql_statement ⇒ Object
145 146 147 148 |
# File 'lib/bulk_insert/worker.rb', line 145 def insert_sql_statement insert_ignore = @ignore ? @statement_adapter.insert_ignore_statement : '' "INSERT #{insert_ignore} INTO #{@table_name} (#{@column_names}) VALUES " end |
#pending? ⇒ Boolean
39 40 41 |
# File 'lib/bulk_insert/worker.rb', line 39 def pending? @set.any? end |
#pending_count ⇒ Object
43 44 45 |
# File 'lib/bulk_insert/worker.rb', line 43 def pending_count @set.count end |
#save! ⇒ Object
83 84 85 86 87 88 89 90 91 92 |
# File 'lib/bulk_insert/worker.rb', line 83 def save! if pending? @before_save_callback.(@set) if @before_save_callback execute_query @after_save_callback.() if @after_save_callback @set.clear end self end |