Method: PG::Connection#copy_data

Defined in:
lib/pg/connection.rb

#copy_data(sql, coder = nil) ⇒ Object

call-seq:

conn.copy_data( sql [, coder] ) {|sql_result| ... } -> PG::Result

Execute a copy process for transferring data to or from the server.

This issues the SQL COPY command via #exec. The response to this (if there is no error in the command) is a PG::Result object that is passed to the block, bearing a status code of PGRES_COPY_OUT or PGRES_COPY_IN (depending on the specified copy direction). The application should then use #put_copy_data or #get_copy_data to receive or transmit data rows and should return from the block when finished.

#copy_data returns another PG::Result object when the data transfer is complete. An exception is raised if some problem was encountered, so it isn’t required to make use of any of them. At this point further SQL commands can be issued via #exec. (It is not possible to execute other SQL commands using the same connection while the COPY operation is in progress.)

This method ensures, that the copy process is properly terminated in case of client side or server side failures. Therefore, in case of blocking mode of operation, #copy_data is preferred to raw calls of #put_copy_data, #get_copy_data and #put_copy_end.

coder can be a PG::Coder derivation (typically PG::TextEncoder::CopyRow or PG::TextDecoder::CopyRow). This enables encoding of data fields given to #put_copy_data or decoding of fields received by #get_copy_data.

Example with CSV input format:

conn.exec "create table my_table (a text,b text,c text,d text)"
conn.copy_data "COPY my_table FROM STDIN CSV" do
  conn.put_copy_data "some,data,to,copy\n"
  conn.put_copy_data "more,data,to,copy\n"
end

This creates my_table and inserts two CSV rows.

The same with text format encoder PG::TextEncoder::CopyRow and Array input:

enco = PG::TextEncoder::CopyRow.new
conn.copy_data "COPY my_table FROM STDIN", enco do
  conn.put_copy_data ['some', 'data', 'to', 'copy']
  conn.put_copy_data ['more', 'data', 'to', 'copy']
end

All 4 CopyRow classes can take a type map to specify how the columns are mapped to and from the database format. For details see the particular CopyRow class description.

PG::BinaryEncoder::CopyRow can be used to send data in binary format to the server. In this case copy_data generates the header and trailer data automatically:

enco = PG::BinaryEncoder::CopyRow.new
conn.copy_data "COPY my_table FROM STDIN (FORMAT binary)", enco do
  conn.put_copy_data ['some', 'data', 'to', 'copy']
  conn.put_copy_data ['more', 'data', 'to', 'copy']
end

Example with CSV output format:

conn.copy_data "COPY my_table TO STDOUT CSV" do
  while row=conn.get_copy_data
    p row
  end
end

This prints all rows of my_table to stdout:

"some,data,to,copy\n"
"more,data,to,copy\n"

The same with text format decoder PG::TextDecoder::CopyRow and Array output:

deco = PG::TextDecoder::CopyRow.new
conn.copy_data "COPY my_table TO STDOUT", deco do
  while row=conn.get_copy_data
    p row
  end
end

This receives all rows of my_table as ruby array:

["some", "data", "to", "copy"]
["more", "data", "to", "copy"]

Also PG::BinaryDecoder::CopyRow can be used to retrieve data in binary format from the server. In this case the header and trailer data is processed by the decoder and the remaining nil from get_copy_data is processed by copy_data, so that binary data can be processed equally to text data:

deco = PG::BinaryDecoder::CopyRow.new
conn.copy_data "COPY my_table TO STDOUT (FORMAT binary)", deco do
  while row=conn.get_copy_data
    p row
  end
end

This receives all rows of my_table as ruby array:

["some", "data", "to", "copy"]
["more", "data", "to", "copy"]


214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
# File 'lib/pg/connection.rb', line 214

def copy_data( sql, coder=nil )
  raise PG::NotInBlockingMode.new("copy_data can not be used in nonblocking mode", connection: self) if nonblocking?
  res = exec( sql )

  case res.result_status
  when PGRES_COPY_IN
    begin
      if coder && res.binary_tuples == 1
        # Binary file header (11 byte signature, 32 bit flags and 32 bit extension length)
        put_copy_data(BinarySignature + ("\x00" * 8))
      end

      if coder
        old_coder = self.encoder_for_put_copy_data
        self.encoder_for_put_copy_data = coder
      end

      yield res
    rescue Exception => err
      errmsg = "%s while copy data: %s" % [ err.class.name, err.message ]
      begin
        put_copy_end( errmsg )
      rescue PG::Error
        # Ignore error in cleanup to avoid losing original exception
      end
      discard_results
      raise err
    else
      begin
        self.encoder_for_put_copy_data = old_coder if coder

        if coder && res.binary_tuples == 1
          put_copy_data("\xFF\xFF") # Binary file trailer 16 bit "-1"
        end

        put_copy_end
      rescue PG::Error => err
        raise PG::LostCopyState.new("#{err} (probably by executing another SQL query while running a COPY command)", connection: self)
      end
      get_last_result
    ensure
      self.encoder_for_put_copy_data = old_coder if coder
    end

  when PGRES_COPY_OUT
    begin
      if coder
        old_coder = self.decoder_for_get_copy_data
        self.decoder_for_get_copy_data = coder
      end
      yield res
    rescue Exception
      cancel
      discard_results
      raise
    else
      if coder && res.binary_tuples == 1
        # There are two end markers in binary mode: file trailer and the final nil.
        # The file trailer is expected to be processed by BinaryDecoder::CopyRow and already returns nil, so that the remaining NULL from PQgetCopyData is retrieved here:
        if get_copy_data
          discard_results
          raise PG::NotAllCopyDataRetrieved.new("Not all binary COPY data retrieved", connection: self)
        end
      end
      res = get_last_result
      if !res
        discard_results
        raise PG::LostCopyState.new("Lost COPY state (probably by executing another SQL query while running a COPY command)", connection: self)
      elsif res.result_status != PGRES_COMMAND_OK
        discard_results
        raise PG::NotAllCopyDataRetrieved.new("Not all COPY data retrieved", connection: self)
      end
      res
    ensure
      self.decoder_for_get_copy_data = old_coder if coder
    end

  else
    raise ArgumentError, "SQL command is no COPY statement: #{sql}"
  end
end