Module: PG::EM::Client::Helper
- Included in:
- Transaction
- Defined in:
- lib/em-pg-client-helper.rb
Overview
Some helper methods to make working with em-pg-client slightly less like trying to build a house with a packet of seeds and a particle accelerator... backwards.
Defined Under Namespace
Classes: BadSequelError
Instance Method Summary collapse
-
#db_bulk_insert(db, tbl, columns, rows, &blk) ⇒ EM::Deferrable
Efficiently perform a "bulk" insert of multiple rows.
-
#db_insert(db, tbl, params) ⇒ EM::Deferrable
Run an insert query, without having to write a great pile of SQL all by yourself.
-
#db_sequel(db) {|sqldb| ... } ⇒ EM::Deferrable
Generate Sequel, and run it against the database connection provided.
-
#db_transaction(db, opts = {}, &blk) ⇒ EM::Deferrable
Execute code in a transaction.
-
#db_upsert(db, tbl, key, data) {|PG::Result| ... } ⇒ EM::Deferrable
Run an upsert query.
-
#insert_sql(tbl, params) ⇒ Object
Generate SQL for an insert statement into
tbl
, with the fields and data given by the keys and values, respectively, ofparams
. -
#quote_identifier(id) ⇒ String
Take a PgSQL identifier (anything that isn't data, basically) and quote it so that it will always be valid, no matter what insanity someone's decided to put in their names.
-
#sequel_sql {|sqldb| ... } ⇒ String
Sequel-based SQL generation.
-
#upsert_sql(tbl, key, data) ⇒ Array<String, Array<Object>>
An "upsert" is a kind of crazy hybrid "update if the record exists, insert it if it doesn't" query.
Instance Method Details
#db_bulk_insert(db, tbl, columns, rows, &blk) ⇒ EM::Deferrable
for details on the tbl
, columns
, and rows
parameters, see
PG::EM::Client::Helper::Transaction#bulk_insert.
Efficiently perform a "bulk" insert of multiple rows.
When you have a large quantity of data to insert into a table, you don't want to do it one row at a time -- that's really inefficient. On the other hand, if you do one giant multi-row insert statement, the insert will fail if any of the rows causes a constraint failure. What to do?
Well, here's our answer: try to insert all the records at once. If that fails with a constraint violation, then split the set of records in half and try to bulk insert each of those halves. Recurse in this fashion until you only have one record to insert.
234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 |
# File 'lib/em-pg-client-helper.rb', line 234 def db_bulk_insert(db, tbl, columns, rows, &blk) EM::Completion.new.tap do |df| df.callback(&blk) if blk db_transaction(db) do |txn| txn.bulk_insert(tbl, columns, rows) do |count| txn.commit do df.succeed(count) end end end.errback do |ex| df.fail(ex) end end end |
#db_insert(db, tbl, params) ⇒ EM::Deferrable
Run an insert query, without having to write a great pile of SQL all by yourself.
194 195 196 |
# File 'lib/em-pg-client-helper.rb', line 194 def db_insert(db, tbl, params) db.exec_defer(*insert_sql(tbl, params)) end |
#db_sequel(db) {|sqldb| ... } ⇒ EM::Deferrable
Generate Sequel, and run it against the database connection provided.
This is the all-singing variant of #sequel_sql -- in addition to generating the SQL, we also run the result against the database connection you pass.
153 154 155 |
# File 'lib/em-pg-client-helper.rb', line 153 def db_sequel(db, &blk) db.exec_defer(sequel_sql(&blk)) end |
#db_transaction(db, opts = {}, &blk) ⇒ EM::Deferrable
Due to the way that transactions detect when they are completed, every deferrable in the scope of the transaction must be generated by the transaction. That is, you cannot use objects other than the transaction asynchronously. This is a known limitation, and will be addressed in a future version of this library.
Execute code in a transaction.
Calling this method opens up a transaction (by executing BEGIN
), and
then runs the supplied block, passing in a transaction object which you
can use to execute SQL commands. Once the transaction is finished,
COMMIT
or ROLLBACK
will be sent to the DB server to complete the
transaction, depending on whether or not any errors (query failures or
Ruby exceptions) appeared during the transaction. You can also
manually call txn.rollback(reason)
if you want to signal that the
transaction should be rolled back.
You should use #callback
and #errback
against the deferrable that
db_transaction
returns to specify what to run after the transaction
completes successfully or fails, respectively.
417 418 419 420 421 422 423 424 425 |
# File 'lib/em-pg-client-helper.rb', line 417 def db_transaction(db, opts = {}, &blk) if db.is_a? PG::EM::ConnectionPool db.__send__(:hold_deferred) do |conn| ::PG::EM::Client::Helper::Transaction.new(conn, opts, &blk) end else ::PG::EM::Client::Helper::Transaction.new(db, opts, &blk) end end |
#db_upsert(db, tbl, key, data) {|PG::Result| ... } ⇒ EM::Deferrable
Run an upsert query.
Apply an upsert (update-or-insert) against a given database connection or connection pool, handling the (rarely needed) unique violation that can result.
339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 |
# File 'lib/em-pg-client-helper.rb', line 339 def db_upsert(db, tbl, key, data) q = upsert_sql(tbl, key, data) ::EM::DefaultDeferrable.new.tap do |df| db.exec_defer(*q).callback do |r| df.succeed(r) end.errback do |ex| if ex.is_a?(PG::UniqueViolation) db.exec_defer(*q).callback do |r| df.succeed(r) end.errback do |ex| df.fail(ex) end else df.fail(ex) end end end end |
#insert_sql(tbl, params) ⇒ Object
Generate SQL for an insert statement into tbl
, with the fields and
data given by the keys and values, respectively, of params
. Returns
a two-element array consisting of the parameterised SQL as the first
element, and the array of parameters as the second element.
166 167 168 169 170 171 172 |
# File 'lib/em-pg-client-helper.rb', line 166 def insert_sql(tbl, params) keys = params.keys.map { |k| quote_identifier(k.to_s) }.join(',') vals = params.values val_places = (1..vals.length).to_a.map { |i| "$#{i}" }.join(',') ["INSERT INTO #{quote_identifier(tbl.to_s)} (#{keys}) VALUES (#{val_places})", vals] end |
#quote_identifier(id) ⇒ String
Take a PgSQL identifier (anything that isn't data, basically) and quote it so that it will always be valid, no matter what insanity someone's decided to put in their names.
435 436 437 |
# File 'lib/em-pg-client-helper.rb', line 435 def quote_identifier(id) "\"#{id.gsub(/"/, '""')}\"" end |
#sequel_sql {|sqldb| ... } ⇒ String
Sequel-based SQL generation.
While we could spend a lot of time writing code to generate various kinds of SQL "by hand", it would be wasted effort, since the Sequel database toolkit gives us a complete, and extremely powerful, SQL generation system, which is already familiar to a great many programmers (or, at least, many great programmers).
Hence, rather than reinvent the wheel, we simply drop Sequel in.
Anything you can do with an instance of Sequel::Database
that
produces a single SQL query, you can almost certainly do with this
method.
Usage is quite simple: calling this method will yield a pseudo-database object to the block you pass. You can then call whatever methods you like against the database object, and when you're done and the block you passed completes, we'll return the SQL that Sequel generated.
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/em-pg-client-helper.rb', line 73 def sequel_sql sqldb = Thread.current[:em_pg_client_sequel_db] ||= Sequel.connect("mock://postgres", :keep_reference => false) ret = yield sqldb if block_given? sqls = sqldb.sqls if sqls.empty? sqls = [ret.sql] rescue ret end if sqls.nil? or sqls.empty? raise PG::EM::Client::Helper::BadSequelError, "Your block did not generate an SQL statement" end if sqls.length > 1 raise PG::EM::Client::Helper::BadSequelError, "Your block generated multiple SQL statements" end sqls.first end |
#upsert_sql(tbl, key, data) ⇒ Array<String, Array<Object>>
An "upsert" is a kind of crazy hybrid "update if the record exists, insert it if it doesn't" query. It isn't part of the SQL standard, but it is such a common idiom that we're keen to support it.
The trick is that it's actually two queries in one. We try to do an
UPDATE
first, and if that doesn't actually update anything, then we
try an INSERT
. Since it is two separate queries, though, there is still
a small chance that the query will fail with a PG::UniqueViolation
, so
your code must handle that.
As an added bonus, the SQL that this method generates will, when executed, return the complete row that has been inserted or updated.
286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 |
# File 'lib/em-pg-client-helper.rb', line 286 def upsert_sql(tbl, key, data) tbl = quote_identifier(tbl) insert_keys = data.keys.map { |k| quote_identifier(k.to_s) } unique_keys = (key.is_a?(Array) ? key : [key]) unique_keys.map! { |k| quote_identifier(k.to_s) } update_keys = insert_keys - unique_keys unless (bad_keys = unique_keys - insert_keys).empty? raise ArgumentError, "These field(s) were mentioned in the key list, but were not in the data set: #{bad_keys.inspect}" end values = data.values # field-to-placeholder mapping i = 0 fp_map = Hash[insert_keys.map { |k| i += 1; [k, "$#{i}"] }] update_values = update_keys.map { |k| "#{k}=#{fp_map[k]}" }.join(',') select_values = unique_keys.map { |k| "#{k}=#{fp_map[k]}" }.join(' AND ') update_query = "UPDATE #{tbl} SET #{update_values} WHERE #{select_values} RETURNING *" insert_query = "INSERT INTO #{tbl} (#{fp_map.keys.join(',')}) " + "SELECT #{fp_map.values.join(',')}" ["WITH update_query AS (#{update_query}), " + "insert_query AS (#{insert_query} " + "WHERE NOT EXISTS (SELECT * FROM update_query) " + "RETURNING *) " + "SELECT * FROM update_query UNION SELECT * FROM insert_query", data.values ] end |