Class: Google::Cloud::Bigquery::Table::AsyncInserter

Inherits:
Object
  • Object
show all
Includes:
MonitorMixin
Defined in:
lib/google/cloud/bigquery/table/async_inserter.rb

Overview

AsyncInserter

Used to insert multiple rows in batches to a topic. See #insert_async.

Examples:

require "google/cloud/bigquery"

bigquery = Google::Cloud::Bigquery.new
dataset = bigquery.dataset "my_dataset"
table = dataset.table "my_table"
inserter = table.insert_async do |result|
  if result.error?
    log_error result.error
  else
    log_insert "inserted #{result.insert_count} rows " \
      "with #{result.error_count} errors"
  end
end

rows = [
  { "first_name" => "Alice", "age" => 21 },
  { "first_name" => "Bob", "age" => 22 }
]
inserter.insert rows

inserter.stop.wait!

Defined Under Namespace

Classes: Result

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#intervalNumeric (readonly)

The number of seconds to collect rows before the batch is inserted. Default is 10.

Returns:

  • (Numeric)

    the current value of interval



64
65
66
# File 'lib/google/cloud/bigquery/table/async_inserter.rb', line 64

def interval
  @interval
end

#max_bytesInteger (readonly)

The maximum size of rows to be collected before the batch is inserted. Default is 10,000,000 (10MB).

Returns:

  • (Integer)

    the current value of max_bytes



64
65
66
# File 'lib/google/cloud/bigquery/table/async_inserter.rb', line 64

def max_bytes
  @max_bytes
end

#max_rowsInteger (readonly)

The maximum number of rows to be collected before the batch is inserted. Default is 500.

Returns:

  • (Integer)

    the current value of max_rows



64
65
66
# File 'lib/google/cloud/bigquery/table/async_inserter.rb', line 64

def max_rows
  @max_rows
end

#threadsInteger (readonly)

The number of threads used to insert rows. Default is 4.

Returns:

  • (Integer)

    the current value of threads



64
65
66
# File 'lib/google/cloud/bigquery/table/async_inserter.rb', line 64

def threads
  @threads
end

Instance Method Details

#flushAsyncInserter

Forces all rows in the current batch to be inserted immediately.

Returns:



220
221
222
223
224
225
226
227
# File 'lib/google/cloud/bigquery/table/async_inserter.rb', line 220

def flush
  synchronize do
    push_batch_request!
    @cond.signal
  end

  self
end

#insert(rows, insert_ids: nil) ⇒ Object

Adds rows to the async inserter to be inserted. Rows will be collected in batches and inserted together. See Google::Cloud::Bigquery::Table#insert_async.

Simple Ruby types are generally accepted per JSON rules, along with the following support for BigQuery's more complex types:

BigQuery Ruby Notes
NUMERIC BigDecimal BigDecimal values will be rounded to scale 9.
BIGNUMERIC String Pass as String to avoid rounding to scale 9.
DATETIME DateTime DATETIME does not support time zone.
DATE Date
GEOGRAPHY String
JSON String (Stringified JSON) String, as JSON does not have a schema to verify.
TIMESTAMP Time
TIME Google::Cloud::BigQuery::Time
BYTES File, IO, StringIO, or similar
ARRAY Array Nested arrays, nil values are not supported.
STRUCT Hash Hash keys may be strings or symbols.

Because BigQuery's streaming API is designed for high insertion rates, modifications to the underlying table metadata are eventually consistent when interacting with the streaming system. In most cases metadata changes are propagated within minutes, but during this period API responses may reflect the inconsistent state of the table.

The value :skip can be provided to skip the generation of IDs for all rows, or to skip the generation of an ID for a specific row in the array.

Parameters:

  • rows (Hash, Array<Hash>)

    A hash object or array of hash objects containing the data. Required. BigDecimal values will be rounded to scale 9 to conform with the BigQuery NUMERIC data type. To avoid rounding BIGNUMERIC type values with scale greater than 9, use String instead of BigDecimal.

  • insert_ids (Array<String|Symbol>, Symbol) (defaults to: nil)

    A unique ID for each row. BigQuery uses this property to detect duplicate insertion requests on a best-effort basis. For more information, see data consistency. Optional. If not provided, the client library will assign a UUID to each row before the request is sent.

See Also:



147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/google/cloud/bigquery/table/async_inserter.rb', line 147

def insert rows, insert_ids: nil
  return nil if rows.nil?
  return nil if rows.is_a?(Array) && rows.empty?
  rows, insert_ids = validate_insert_args rows, insert_ids

  synchronize do
    rows.zip(Array(insert_ids)).each do |row, insert_id|
      if @batch.nil?
        @batch = Batch.new max_bytes: @max_bytes, max_rows: @max_rows
        @batch.insert row, insert_id
      else
        unless @batch.try_insert row, insert_id
          push_batch_request!

          @batch = Batch.new max_bytes: @max_bytes, max_rows: @max_rows
          @batch.insert row, insert_id
        end
      end

      @batch_created_at ||= ::Time.now
      @background_thread ||= Thread.new { run_background }

      push_batch_request! if @batch.ready?
    end

    @cond.signal
  end

  true
end

#started?boolean

Whether the inserter has been started.

Returns:

  • (boolean)

    true when started, false otherwise.



234
235
236
# File 'lib/google/cloud/bigquery/table/async_inserter.rb', line 234

def started?
  !stopped?
end

#stopAsyncInserter

Begins the process of stopping the inserter. Rows already in the queue will be inserted, but no new rows can be added. Use #wait! to block until the inserter is fully stopped and all pending rows have been inserted.

Returns:



186
187
188
189
190
191
192
193
194
195
196
# File 'lib/google/cloud/bigquery/table/async_inserter.rb', line 186

def stop
  synchronize do
    break if @stopped

    @stopped = true
    push_batch_request!
    @cond.signal
  end

  self
end

#stopped?boolean

Whether the inserter has been stopped.

Returns:

  • (boolean)

    true when stopped, false otherwise.



243
244
245
# File 'lib/google/cloud/bigquery/table/async_inserter.rb', line 243

def stopped?
  synchronize { @stopped }
end

#wait!(timeout = nil) ⇒ AsyncInserter

Blocks until the inserter is fully stopped, all pending rows have been inserted, and all callbacks have completed. Does not stop the inserter. To stop the inserter, first call #stop and then call #wait! to block until the inserter is stopped.

Returns:



206
207
208
209
210
211
212
213
# File 'lib/google/cloud/bigquery/table/async_inserter.rb', line 206

def wait! timeout = nil
  synchronize do
    @thread_pool.shutdown
    @thread_pool.wait_for_termination timeout
  end

  self
end