Class: BulkInsert::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/bulk_insert/worker.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(connection, table_name, primary_key, column_names, set_size = 500, ignore = false, update_duplicates = false, return_primary_keys = false) ⇒ Worker

Returns a new instance of Worker.



12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/bulk_insert/worker.rb', line 12

def initialize(connection, table_name, primary_key, column_names, set_size=500, ignore=false, update_duplicates=false, return_primary_keys=false)
  @statement_adapter = StatementAdapters.adapter_for(connection)

  @connection = connection
  @set_size = set_size

  @adapter_name = connection.adapter_name
  # INSERT IGNORE only fails inserts with duplicate keys or unallowed nulls not the whole set of inserts
  @ignore = ignore
  @update_duplicates = update_duplicates
  @return_primary_keys = return_primary_keys

  columns = connection.columns(table_name)
  column_map = columns.inject({}) { |h, c| h.update(c.name => c) }

  @primary_key = primary_key
  @columns = column_names.map { |name| column_map[name.to_s] }
  @table_name = connection.quote_table_name(table_name)
  @column_names = column_names.map { |name| connection.quote_column_name(name) }.join(",")

  @before_save_callback = nil
  @after_save_callback = nil

  @result_sets = []
  @set = []
end

Instance Attribute Details

#adapter_nameObject

Returns the value of attribute adapter_name.



9
10
11
# File 'lib/bulk_insert/worker.rb', line 9

def adapter_name
  @adapter_name
end

#after_save_callbackObject

Returns the value of attribute after_save_callback.



8
9
10
# File 'lib/bulk_insert/worker.rb', line 8

def after_save_callback
  @after_save_callback
end

#before_save_callbackObject

Returns the value of attribute before_save_callback.



7
8
9
# File 'lib/bulk_insert/worker.rb', line 7

def before_save_callback
  @before_save_callback
end

#connectionObject (readonly)

Returns the value of attribute connection.



5
6
7
# File 'lib/bulk_insert/worker.rb', line 5

def connection
  @connection
end

#ignoreObject (readonly)

Returns the value of attribute ignore.



10
11
12
# File 'lib/bulk_insert/worker.rb', line 10

def ignore
  @ignore
end

#result_setsObject (readonly)

Returns the value of attribute result_sets.



10
11
12
# File 'lib/bulk_insert/worker.rb', line 10

def result_sets
  @result_sets
end

#set_sizeObject

Returns the value of attribute set_size.



6
7
8
# File 'lib/bulk_insert/worker.rb', line 6

def set_size
  @set_size
end

#update_duplicatesObject (readonly)

Returns the value of attribute update_duplicates.



10
11
12
# File 'lib/bulk_insert/worker.rb', line 10

def update_duplicates
  @update_duplicates
end

Instance Method Details

#add(values) ⇒ Object



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/bulk_insert/worker.rb', line 47

def add(values)
  save! if @set.length >= set_size

  values = values.with_indifferent_access if values.is_a?(Hash)
  mapped = @columns.map.with_index do |column, index|
      value_exists = values.is_a?(Hash) ? values.key?(column.name) : (index < values.length)
      if !value_exists
        if column.default.present?
          column.default
        elsif column.name == "created_at" || column.name == "updated_at"
          :__timestamp_placeholder
        else
          nil
        end
      else
        values.is_a?(Hash) ? values[column.name] : values[index]
      end
    end

  @set.push(mapped)
  self
end

#add_all(rows) ⇒ Object



70
71
72
73
# File 'lib/bulk_insert/worker.rb', line 70

def add_all(rows)
  rows.each { |row| add(row) }
  self
end

#after_save(&block) ⇒ Object



79
80
81
# File 'lib/bulk_insert/worker.rb', line 79

def after_save(&block)
  @after_save_callback = block
end

#before_save(&block) ⇒ Object



75
76
77
# File 'lib/bulk_insert/worker.rb', line 75

def before_save(&block)
  @before_save_callback = block
end

#compose_insert_queryObject



112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# File 'lib/bulk_insert/worker.rb', line 112

def compose_insert_query
  sql = insert_sql_statement
  @now = Time.now
  rows = []

  @set.each do |row|
    values = []
    @columns.zip(row) do |column, value|
      value = @now if value == :__timestamp_placeholder

      if ActiveRecord::VERSION::STRING >= "5.0.0"
        if column
          type = @connection.lookup_cast_type_from_column(column)
          value = type.serialize(value)
        end
        values << @connection.quote(value)
      else
        values << @connection.quote(value, column)
      end
    end
    rows << "(#{values.join(',')})"
  end

  if !rows.empty?
    sql << rows.join(",")
    sql << @statement_adapter.on_conflict_statement(@columns, ignore, update_duplicates)
    sql << @statement_adapter.primary_key_return_statement(@primary_key) if @return_primary_keys
    sql
  else
    false
  end
end

#execute_queryObject



94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/bulk_insert/worker.rb', line 94

def execute_query
  if query = compose_insert_query

    # Return primary key support broke mysql compatibility
    # with rails < 5 mysql adapter. (see issue #41)
    if ActiveRecord::VERSION::STRING < "5.0.0" && @statement_adapter.is_a?(StatementAdapters::MySQLAdapter)
      # raise an exception for unsupported return_primary_keys
      raise ArgumentError.new("BulkInsert does not support @return_primary_keys for mysql and rails < 5") if @return_primary_keys

      # restore v1.6 query execution
      @connection.execute(query)
    else
      result_set = @connection.exec_query(query)
      @result_sets.push(result_set) if @return_primary_keys
    end
  end
end

#insert_sql_statementObject



145
146
147
148
# File 'lib/bulk_insert/worker.rb', line 145

def insert_sql_statement
  insert_ignore = @ignore ? @statement_adapter.insert_ignore_statement : ''
  "INSERT #{insert_ignore} INTO #{@table_name} (#{@column_names}) VALUES "
end

#pending?Boolean

Returns:

  • (Boolean)


39
40
41
# File 'lib/bulk_insert/worker.rb', line 39

def pending?
  @set.any?
end

#pending_countObject



43
44
45
# File 'lib/bulk_insert/worker.rb', line 43

def pending_count
  @set.count
end

#save!Object



83
84
85
86
87
88
89
90
91
92
# File 'lib/bulk_insert/worker.rb', line 83

def save!
  if pending?
    @before_save_callback.(@set) if @before_save_callback
    execute_query
    @after_save_callback.() if @after_save_callback
    @set.clear
  end

  self
end