Class: PG::Pglogical::Client
- Inherits:
-
Object
- Object
- PG::Pglogical::Client
- Defined in:
- lib/pg/pglogical/client.rb
Instance Attribute Summary collapse
-
#connection ⇒ Object
Returns the value of attribute connection.
Instance Method Summary collapse
-
#disable ⇒ Object
Disables pglogical postgres extensions.
-
#enable ⇒ Object
Enables pglogical postgres extensions.
-
#enabled? ⇒ Boolean
Returns whether pglogical is currently enabled or not.
-
#initialize(connection) ⇒ Client
constructor
A new instance of Client.
-
#installed? ⇒ Boolean
Returns whether the pglogical postgres extension is installed or not.
-
#lag_bytes ⇒ Array<Hash<String,String>>
Reports on replication lag from provider to subscriber nodes This method must be run on the provider node.
-
#node_create(name, dsn) ⇒ Object
Creates a node.
-
#node_drop(name, ifexists = false) ⇒ Object
Drops the node.
-
#node_dsn_update(name, dsn) ⇒ Boolean
Updates a node connection string.
- #nodes ⇒ Object
-
#replication_set_add_all_tables(set_name, schema_names, sync = false) ⇒ Object
Adds all tables in the given schemas to the replication set.
-
#replication_set_add_table(set_name, table_name, sync = false) ⇒ Object
Adds a table to a replication set.
-
#replication_set_alter(set_name, insert = true, update = true, delete = true, truncate = true) ⇒ Object
Alters an existing replication set.
-
#replication_set_create(set_name, insert = true, update = true, delete = true, truncate = true) ⇒ Object
Creates a new replication set.
-
#replication_set_drop(set_name) ⇒ Object
Removes a replication set.
-
#replication_set_remove_table(set_name, table_name) ⇒ Object
Removes a table from a replication set.
-
#replication_sets ⇒ Array<String>
Lists the current replication sets.
-
#subscription_add_replication_set(name, set_name) ⇒ Object
Adds a replication set to a subscription Does not sync, only activates event consumption.
-
#subscription_create(name, dsn, replication_sets = %w(default default_insert_only), sync_structure = true, sync_data = true, forward_origins = ["all"]) ⇒ Object
Creates a subscription to a provider node.
-
#subscription_disable(name, immediate = false) ⇒ Object
Disables a subscription and disconnects it from the provider.
-
#subscription_drop(name, ifexists = false) ⇒ Object
Disconnects the subscription and removes it.
-
#subscription_enable(name, immediate = false) ⇒ Object
Enables a previously disabled subscription.
-
#subscription_remove_replication_set(name, set_name) ⇒ Object
Removes a replication set from a subscription.
-
#subscription_resync_table(name, table) ⇒ Object
Resyncs one existing table Table will be truncated before the sync.
-
#subscription_show_status(name) ⇒ Hash
Shows status and basic information about a subscription.
-
#subscription_sync(name, truncate = false) ⇒ Object
Syncs all unsynchronized tables in all sets in a single operation.
-
#subscriptions ⇒ Object
Shows the status of all configured subscriptions.
-
#tables_in_replication_set(set_name) ⇒ Array<String>
Lists the tables currently in the replication set.
-
#wal_retained_bytes ⇒ Array<Hash<String,String>>
Reports on replication bytes of WAL being retained for each replication slot This method must be run on the provider node.
- #with_replication_set_lock(set_name) ⇒ Object
Constructor Details
#initialize(connection) ⇒ Client
Returns a new instance of Client.
9 10 11 |
# File 'lib/pg/pglogical/client.rb', line 9 def initialize(connection) @connection = connection end |
Instance Attribute Details
#connection ⇒ Object
Returns the value of attribute connection.
6 7 8 |
# File 'lib/pg/pglogical/client.rb', line 6 def connection @connection end |
Instance Method Details
#disable ⇒ Object
Disables pglogical postgres extensions
34 35 36 37 |
# File 'lib/pg/pglogical/client.rb', line 34 def disable connection.disable_extension("pglogical") connection.disable_extension("pglogical_origin") if connection.postgresql_version < 90_500 end |
#enable ⇒ Object
Enables pglogical postgres extensions
28 29 30 31 |
# File 'lib/pg/pglogical/client.rb', line 28 def enable connection.enable_extension("pglogical_origin") if connection.postgresql_version < 90_500 connection.enable_extension("pglogical") end |
#enabled? ⇒ Boolean
Returns whether pglogical is currently enabled or not
21 22 23 24 25 |
# File 'lib/pg/pglogical/client.rb', line 21 def enabled? return false unless installed? && connection.extension_enabled?("pglogical") return true if connection.postgresql_version >= 90_500 connection.extension_enabled?("pglogical_origin") end |
#installed? ⇒ Boolean
Returns whether the pglogical postgres extension is installed or not
14 15 16 |
# File 'lib/pg/pglogical/client.rb', line 14 def installed? connection.select_value("SELECT EXISTS(SELECT * FROM pg_available_extensions WHERE name = 'pglogical')") end |
#lag_bytes ⇒ Array<Hash<String,String>>
Reports on replication lag from provider to subscriber nodes This method must be run on the provider node
47 48 49 50 51 52 53 54 |
# File 'lib/pg/pglogical/client.rb', line 47 def lag_bytes typed_exec(<<-SQL).to_a SELECT pg_xlog_location_diff(pg_current_xlog_insert_location(), flush_location) AS lag_bytes, application_name FROM pg_stat_replication SQL end |
#node_create(name, dsn) ⇒ Object
Creates a node
78 79 80 |
# File 'lib/pg/pglogical/client.rb', line 78 def node_create(name, dsn) typed_exec("SELECT pglogical.create_node($1, $2)", name, dsn) end |
#node_drop(name, ifexists = false) ⇒ Object
Drops the node
110 111 112 |
# File 'lib/pg/pglogical/client.rb', line 110 def node_drop(name, ifexists = false) typed_exec("SELECT pglogical.drop_node($1, $2)", name, ifexists) end |
#node_dsn_update(name, dsn) ⇒ Boolean
Updates a node connection string
NOTE: This method relies on the internals of the pglogical tables
rather than a published API.
NOTE: Disable subscriptions involving the node before calling this
method for a provider node in a subscriber database.
92 93 94 95 96 97 98 99 100 101 102 103 104 |
# File 'lib/pg/pglogical/client.rb', line 92 def node_dsn_update(name, dsn) res = typed_exec(<<-SQL, name, dsn) UPDATE pglogical.node_interface SET if_dsn = $2 WHERE if_nodeid = ( SELECT node_id FROM pglogical.node WHERE node_name = $1 ) SQL res.cmd_tuples == 1 end |
#nodes ⇒ Object
114 115 116 117 118 119 120 |
# File 'lib/pg/pglogical/client.rb', line 114 def nodes typed_exec(<<-SQL) SELECT node_name AS name, if_dsn AS conn_string FROM pglogical.node join pglogical.node_interface ON if_nodeid = node_id SQL end |
#replication_set_add_all_tables(set_name, schema_names, sync = false) ⇒ Object
Adds all tables in the given schemas to the replication set
299 300 301 302 |
# File 'lib/pg/pglogical/client.rb', line 299 def replication_set_add_all_tables(set_name, schema_names, sync = false) typed_exec("SELECT pglogical.replication_set_add_all_tables($1, $2, $3)", set_name, schema_names, sync) end |
#replication_set_add_table(set_name, table_name, sync = false) ⇒ Object
Adds a table to a replication set
289 290 291 292 |
# File 'lib/pg/pglogical/client.rb', line 289 def replication_set_add_table(set_name, table_name, sync = false) typed_exec("SELECT pglogical.replication_set_add_table($1, $2, $3)", set_name, table_name, sync) end |
#replication_set_alter(set_name, insert = true, update = true, delete = true, truncate = true) ⇒ Object
Alters an existing replication set
272 273 274 275 |
# File 'lib/pg/pglogical/client.rb', line 272 def replication_set_alter(set_name, insert = true, update = true, delete = true, truncate = true) typed_exec("SELECT pglogical.alter_replication_set($1, $2, $3, $4, $5)", set_name, insert, update, delete, truncate) end |
#replication_set_create(set_name, insert = true, update = true, delete = true, truncate = true) ⇒ Object
Creates a new replication set
260 261 262 263 |
# File 'lib/pg/pglogical/client.rb', line 260 def replication_set_create(set_name, insert = true, update = true, delete = true, truncate = true) typed_exec("SELECT pglogical.create_replication_set($1, $2, $3, $4, $5)", set_name, insert, update, delete, truncate) end |
#replication_set_drop(set_name) ⇒ Object
Removes a replication set
280 281 282 |
# File 'lib/pg/pglogical/client.rb', line 280 def replication_set_drop(set_name) typed_exec("SELECT pglogical.drop_replication_set($1)", set_name) end |
#replication_set_remove_table(set_name, table_name) ⇒ Object
Removes a table from a replication set
308 309 310 311 |
# File 'lib/pg/pglogical/client.rb', line 308 def replication_set_remove_table(set_name, table_name) typed_exec("SELECT pglogical.replication_set_remove_table($1, $2)", set_name, table_name) end |
#replication_sets ⇒ Array<String>
Lists the current replication sets
249 250 251 |
# File 'lib/pg/pglogical/client.rb', line 249 def replication_sets typed_exec("SELECT set_name FROM pglogical.replication_set").values.flatten end |
#subscription_add_replication_set(name, set_name) ⇒ Object
Adds a replication set to a subscription Does not sync, only activates event consumption
192 193 194 195 |
# File 'lib/pg/pglogical/client.rb', line 192 def subscription_add_replication_set(name, set_name) typed_exec("SELECT pglogical.alter_subscription_add_replication_set($1, $2)", name, set_name) end |
#subscription_create(name, dsn, replication_sets = %w(default default_insert_only), sync_structure = true, sync_data = true, forward_origins = ["all"]) ⇒ Object
Creates a subscription to a provider node
134 135 136 137 138 |
# File 'lib/pg/pglogical/client.rb', line 134 def subscription_create(name, dsn, replication_sets = %w(default default_insert_only), # rubocop:disable Metrics/ParameterLists sync_structure = true, sync_data = true, forward_origins = ["all"]) typed_exec("SELECT pglogical.create_subscription($1, $2, $3, $4, $5, $6)", name, dsn, replication_sets, sync_structure, sync_data, forward_origins) end |
#subscription_disable(name, immediate = false) ⇒ Object
Disables a subscription and disconnects it from the provider
153 154 155 156 |
# File 'lib/pg/pglogical/client.rb', line 153 def subscription_disable(name, immediate = false) typed_exec("SELECT pglogical.alter_subscription_disable($1, $2)", name, immediate) end |
#subscription_drop(name, ifexists = false) ⇒ Object
Disconnects the subscription and removes it
144 145 146 147 |
# File 'lib/pg/pglogical/client.rb', line 144 def subscription_drop(name, ifexists = false) typed_exec("SELECT pglogical.drop_subscription($1, $2)", name, ifexists) end |
#subscription_enable(name, immediate = false) ⇒ Object
Enables a previously disabled subscription
162 163 164 165 |
# File 'lib/pg/pglogical/client.rb', line 162 def subscription_enable(name, immediate = false) typed_exec("SELECT pglogical.alter_subscription_enable($1, $2)", name, immediate) end |
#subscription_remove_replication_set(name, set_name) ⇒ Object
Removes a replication set from a subscription
201 202 203 204 |
# File 'lib/pg/pglogical/client.rb', line 201 def subscription_remove_replication_set(name, set_name) typed_exec("SELECT pglogical.alter_subscription_remove_replication_set($1, $2)", name, set_name) end |
#subscription_resync_table(name, table) ⇒ Object
Resyncs one existing table Table will be truncated before the sync
182 183 184 185 |
# File 'lib/pg/pglogical/client.rb', line 182 def subscription_resync_table(name, table) typed_exec("SELECT pglogical.alter_subscription_resynchronize_table($1, $2)", name, table) end |
#subscription_show_status(name) ⇒ Hash
Shows status and basic information about a subscription
220 221 222 223 224 225 226 227 228 229 230 231 232 |
# File 'lib/pg/pglogical/client.rb', line 220 def subscription_show_status(name) sql = <<-SQL SELECT sub.*, stat.remote_lsn AS remote_replication_lsn, stat.local_lsn AS local_replication_lsn FROM pglogical.show_subscription_status($1) sub LEFT JOIN pg_replication_origin_status stat ON sub.slot_name = stat.external_id SQL typed_exec(sql, name).first.tap do |s| s["replication_sets"] = s["replication_sets"][1..-2].split(",") s["forward_origins"] = s["forward_origins"][1..-2].split(",") end end |
#subscription_sync(name, truncate = false) ⇒ Object
Syncs all unsynchronized tables in all sets in a single operation.
Command does not block
172 173 174 175 |
# File 'lib/pg/pglogical/client.rb', line 172 def subscription_sync(name, truncate = false) typed_exec("SELECT pglogical.alter_subscription_synchronize($1, $2)", name, truncate) end |
#subscriptions ⇒ Object
Shows the status of all configured subscriptions
237 238 239 240 241 |
# File 'lib/pg/pglogical/client.rb', line 237 def subscriptions connection.select_values("SELECT sub_name FROM pglogical.subscription").collect do |s| subscription_show_status(s) end end |
#tables_in_replication_set(set_name) ⇒ Array<String>
Lists the tables currently in the replication set
317 318 319 320 321 322 323 324 325 |
# File 'lib/pg/pglogical/client.rb', line 317 def tables_in_replication_set(set_name) typed_exec(<<-SQL, set_name).values.flatten SELECT set_reloid FROM pglogical.replication_set_relation JOIN pglogical.replication_set USING (set_id) WHERE set_name = $1 SQL end |
#wal_retained_bytes ⇒ Array<Hash<String,String>>
Reports on replication bytes of WAL being retained for each replication slot This method must be run on the provider node
61 62 63 64 65 66 67 68 69 |
# File 'lib/pg/pglogical/client.rb', line 61 def wal_retained_bytes typed_exec(<<-SQL).to_a SELECT pg_xlog_location_diff(pg_current_xlog_insert_location(), restart_lsn) AS retained_bytes, slot_name FROM pg_replication_slots WHERE plugin = 'pglogical_output' SQL end |
#with_replication_set_lock(set_name) ⇒ Object
327 328 329 330 331 332 333 334 335 336 337 |
# File 'lib/pg/pglogical/client.rb', line 327 def with_replication_set_lock(set_name) connection.transaction(:requires_new => true) do typed_exec(<<-SQL, set_name) SELECT * FROM pglogical.replication_set WHERE set_name = $1 FOR UPDATE SQL yield end end |