Class: River::Driver::Sequel
- Inherits:
-
Object
- Object
- River::Driver::Sequel
- Defined in:
- lib/driver.rb
Overview
Instance Method Summary collapse
- #advisory_lock(key) ⇒ Object
- #advisory_lock_try(key) ⇒ Object
-
#initialize(db) ⇒ Sequel
constructor
A new instance of Sequel.
- #job_get_by_id(id) ⇒ Object
- #job_get_by_kind_and_unique_properties(get_params) ⇒ Object
- #job_insert(insert_params) ⇒ Object
- #job_insert_many(insert_params_many) ⇒ Object
- #job_insert_unique(insert_params, unique_key) ⇒ Object
- #job_list ⇒ Object
- #rollback_exception ⇒ Object
- #transaction ⇒ Object
Constructor Details
#initialize(db) ⇒ Sequel
Returns a new instance of Sequel.
10 11 12 13 14 |
# File 'lib/driver.rb', line 10 def initialize(db) @db = db @db.extension(:pg_array) @db.extension(:pg_json) end |
Instance Method Details
#advisory_lock(key) ⇒ Object
16 17 18 19 |
# File 'lib/driver.rb', line 16 def advisory_lock(key) @db.fetch("SELECT pg_advisory_xact_lock(?)", key).first nil end |
#advisory_lock_try(key) ⇒ Object
21 22 23 |
# File 'lib/driver.rb', line 21 def advisory_lock_try(key) @db.fetch("SELECT pg_try_advisory_xact_lock(?)", key).first[:pg_try_advisory_xact_lock] end |
#job_get_by_id(id) ⇒ Object
25 26 27 28 |
# File 'lib/driver.rb', line 25 def job_get_by_id(id) data_set = @db[:river_job].where(id: id) data_set.first ? to_job_row(data_set.first) : nil end |
#job_get_by_kind_and_unique_properties(get_params) ⇒ Object
30 31 32 33 34 35 36 37 |
# File 'lib/driver.rb', line 30 def job_get_by_kind_and_unique_properties(get_params) data_set = @db[:river_job].where(kind: get_params.kind) data_set = data_set.where(::Sequel.lit("tstzrange(?, ?, '[)') @> created_at", get_params.created_at[0], get_params.created_at[1])) if get_params.created_at data_set = data_set.where(args: get_params.encoded_args) if get_params.encoded_args data_set = data_set.where(queue: get_params.queue) if get_params.queue data_set = data_set.where(state: get_params.state) if get_params.state data_set.first ? to_job_row(data_set.first) : nil end |
#job_insert(insert_params) ⇒ Object
39 40 41 |
# File 'lib/driver.rb', line 39 def job_insert(insert_params) to_job_row(@db[:river_job].returning.insert_select(insert_params_to_hash(insert_params))) end |
#job_insert_many(insert_params_many) ⇒ Object
58 59 60 61 |
# File 'lib/driver.rb', line 58 def job_insert_many(insert_params_many) @db[:river_job].multi_insert(insert_params_many.map { |p| insert_params_to_hash(p) }) insert_params_many.count end |
#job_insert_unique(insert_params, unique_key) ⇒ Object
43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/driver.rb', line 43 def job_insert_unique(insert_params, unique_key) values = @db[:river_job] .insert_conflict( target: [:kind, :unique_key], conflict_where: ::Sequel.lit("unique_key IS NOT NULL"), update: {kind: ::Sequel[:excluded][:kind]} ) .returning(::Sequel.lit("*, (xmax != 0) AS unique_skipped_as_duplicate")) .insert_select( insert_params_to_hash(insert_params).merge(unique_key: ::Sequel.blob(unique_key)) ) [to_job_row(values), values[:unique_skipped_as_duplicate]] end |
#job_list ⇒ Object
63 64 65 66 |
# File 'lib/driver.rb', line 63 def job_list data_set = @db[:river_job].order_by(:id) data_set.all.map { |job| to_job_row(job) } end |
#rollback_exception ⇒ Object
68 69 70 |
# File 'lib/driver.rb', line 68 def rollback_exception ::Sequel::Rollback end |
#transaction ⇒ Object
72 73 74 |
# File 'lib/driver.rb', line 72 def transaction(&) @db.transaction(savepoint: true, &) end |