Module: Pgq::Api
- Defined in:
- lib/pgq/api.rb
Instance Method Summary collapse
-
#pgq_create_queue(queue_name) ⇒ Object
manage queues.
- #pgq_drop_queue(queue_name) ⇒ Object
-
#pgq_event_failed(batch_id, event_id, reason) ⇒ Object
failed/retry.
- #pgq_event_retry(batch_id, event_id, retry_seconds) ⇒ Object
- #pgq_failed_event_count(queue_name, consumer_name) ⇒ Object
- #pgq_failed_event_delete(queue_name, consumer_name, event_id) ⇒ Object
- #pgq_failed_event_list(queue_name, consumer_name, limit = nil, offset = nil, order = 'desc') ⇒ Object
-
#pgq_failed_event_retry(queue_name, consumer_name, event_id) ⇒ Object
failed events.
- #pgq_finish_batch(batch_id) ⇒ Object
- #pgq_force_tick(queue_name) ⇒ Object
- #pgq_get_batch_events(batch_id) ⇒ Object
- #pgq_get_consumer_info ⇒ Object
- #pgq_get_consumer_queue_info(queue_name) ⇒ Object
-
#pgq_get_queue_info(queue_name) ⇒ Object
info methods.
-
#pgq_get_queues_info ⇒ Object
Get list of queues.
-
#pgq_insert_event(queue_name, ev_type, ev_data, ev_extra1 = nil, ev_extra2 = nil, ev_extra3 = nil, ev_extra4 = nil) ⇒ Object
insert events.
-
#pgq_last_event_id(queue_name) ⇒ Object
utils.
- #pgq_mass_delete_failed_events(queue_name, consumer_name, limit = 5_000) ⇒ Object
- #pgq_mass_retry_failed_events(queue_name, consumer_name, limit = 5_000) ⇒ Object
-
#pgq_next_batch(queue_name, consumer_name) ⇒ Object
consuming.
- #pgq_register_consumer(queue_name, consumer_name) ⇒ Object
- #pgq_unregister_consumer(queue_name, consumer_name) ⇒ Object
Instance Method Details
#pgq_create_queue(queue_name) ⇒ Object
manage queues
6 7 8 |
# File 'lib/pgq/api.rb', line 6 def pgq_create_queue(queue_name) connection.select_value(sanitize_sql_array ["SELECT pgq.create_queue(?)", queue_name]).to_i end |
#pgq_drop_queue(queue_name) ⇒ Object
10 11 12 |
# File 'lib/pgq/api.rb', line 10 def pgq_drop_queue(queue_name) connection.select_value(sanitize_sql_array ["SELECT pgq.drop_queue(?)", queue_name]).to_i end |
#pgq_event_failed(batch_id, event_id, reason) ⇒ Object
failed/retry
47 48 49 |
# File 'lib/pgq/api.rb', line 47 def pgq_event_failed(batch_id, event_id, reason) connection.select_value(sanitize_sql_array ["SELECT pgq.event_failed(?, ?, ?)", batch_id, event_id, reason]).to_i end |
#pgq_event_retry(batch_id, event_id, retry_seconds) ⇒ Object
51 52 53 |
# File 'lib/pgq/api.rb', line 51 def pgq_event_retry(batch_id, event_id, retry_seconds) connection.select_value(sanitize_sql_array ["SELECT pgq.event_retry(?, ?, ?)", batch_id, event_id, retry_seconds]).to_i end |
#pgq_failed_event_count(queue_name, consumer_name) ⇒ Object
65 66 67 68 |
# File 'lib/pgq/api.rb', line 65 def pgq_failed_event_count(queue_name, consumer_name) res = connection.select_value(sanitize_sql_array ["SELECT * FROM pgq.failed_event_count(?, ?)", queue_name, consumer_name]) res ? res.to_i : nil end |
#pgq_failed_event_delete(queue_name, consumer_name, event_id) ⇒ Object
61 62 63 |
# File 'lib/pgq/api.rb', line 61 def pgq_failed_event_delete(queue_name, consumer_name, event_id) connection.select_value(sanitize_sql_array ["SELECT * FROM pgq.failed_event_delete(?, ?, ?)", queue_name, consumer_name, event_id]) end |
#pgq_failed_event_list(queue_name, consumer_name, limit = nil, offset = nil, order = 'desc') ⇒ Object
70 71 72 73 |
# File 'lib/pgq/api.rb', line 70 def pgq_failed_event_list queue_name, consumer_name, limit = nil, offset = nil, order = 'desc' order = (order.to_s == 'desc') ? 'desc' : 'asc' connection.select_all(sanitize_sql_array ["SELECT * FROM pgq.failed_event_list(?, ?, ?, ?) ORDER BY ev_id #{order.upcase}", queue_name, consumer_name, limit.to_i, offset.to_i]) end |
#pgq_failed_event_retry(queue_name, consumer_name, event_id) ⇒ Object
failed events
57 58 59 |
# File 'lib/pgq/api.rb', line 57 def pgq_failed_event_retry(queue_name, consumer_name, event_id) connection.select_value(sanitize_sql_array ["SELECT * FROM pgq.failed_event_retry(?, ?, ?)", queue_name, consumer_name, event_id]) end |
#pgq_finish_batch(batch_id) ⇒ Object
41 42 43 |
# File 'lib/pgq/api.rb', line 41 def pgq_finish_batch(batch_id) connection.select_value(sanitize_sql_array ["SELECT pgq.finish_batch(?)", batch_id]) end |
#pgq_force_tick(queue_name) ⇒ Object
132 133 134 |
# File 'lib/pgq/api.rb', line 132 def pgq_force_tick(queue_name) connection.select_value(sanitize_sql_array ["SELECT pgq.force_tick(?)", queue_name]).to_i end |
#pgq_get_batch_events(batch_id) ⇒ Object
37 38 39 |
# File 'lib/pgq/api.rb', line 37 def pgq_get_batch_events(batch_id) connection.select_all(sanitize_sql_array ["SELECT * FROM pgq.get_batch_events(?)", batch_id]) end |
#pgq_get_consumer_info ⇒ Object
87 88 89 |
# File 'lib/pgq/api.rb', line 87 def pgq_get_consumer_info connection.select_all("SELECT *, EXTRACT(epoch FROM last_seen) AS last_seen_sec, EXTRACT(epoch FROM lag) AS lag_sec FROM pgq.get_consumer_info()") end |
#pgq_get_consumer_queue_info(queue_name) ⇒ Object
91 92 93 |
# File 'lib/pgq/api.rb', line 91 def pgq_get_consumer_queue_info(queue_name) connection.select_one(sanitize_sql_array ["SELECT *, EXTRACT(epoch FROM last_seen) AS last_seen_sec, EXTRACT(epoch FROM lag) AS lag_sec FROM pgq.get_consumer_info(?)", queue_name]) || {} end |
#pgq_get_queue_info(queue_name) ⇒ Object
info methods
77 78 79 |
# File 'lib/pgq/api.rb', line 77 def pgq_get_queue_info(queue_name) connection.select_value(sanitize_sql_array ["SELECT pgq.get_queue_info(?)", queue_name]) end |
#pgq_get_queues_info ⇒ Object
Get list of queues. Result: (queue_name, queue_ntables, queue_cur_table, queue_rotation_period, queue_switch_time, queue_external_ticker, queue_ticker_max_count, queue_ticker_max_lag, queue_ticker_idle_period, ticker_lag)
83 84 85 |
# File 'lib/pgq/api.rb', line 83 def pgq_get_queues_info connection.select_values("SELECT pgq.get_queue_info()") end |
#pgq_insert_event(queue_name, ev_type, ev_data, ev_extra1 = nil, ev_extra2 = nil, ev_extra3 = nil, ev_extra4 = nil) ⇒ Object
insert events
24 25 26 27 28 |
# File 'lib/pgq/api.rb', line 24 def pgq_insert_event(queue_name, ev_type, ev_data, ev_extra1 = nil, ev_extra2 = nil, ev_extra3 = nil, ev_extra4 = nil) result = connection.select_value(sanitize_sql_array ["SELECT pgq.insert_event(?, ?, ?, ?, ?, ?, ?)", queue_name, ev_type, ev_data, ev_extra1, ev_extra2, ev_extra3, ev_extra4]) result ? result.to_i : nil end |
#pgq_last_event_id(queue_name) ⇒ Object
utils
97 98 99 100 101 102 103 104 105 106 107 108 109 110 |
# File 'lib/pgq/api.rb', line 97 def pgq_last_event_id(queue_name) ticks = pgq_get_consumer_queue_info(queue_name) table = connection.select_value("SELECT queue_data_pfx AS table FROM pgq.queue WHERE queue_name = #{sanitize(queue_name)}") result = nil if ticks['current_batch'] sql = connection.select_value("SELECT * FROM pgq.batch_event_sql(#{sanitize(ticks['current_batch'].to_i)})") last_event = connection.select_value("SELECT MAX(ev_id) AS count FROM (#{sql}) AS x") result = last_event.to_i end [table, result] end |
#pgq_mass_delete_failed_events(queue_name, consumer_name, limit = 5_000) ⇒ Object
122 123 124 125 126 127 128 129 130 |
# File 'lib/pgq/api.rb', line 122 def pgq_mass_delete_failed_events(queue_name, consumer_name, limit = 5_000) events = pgq_failed_event_list(queue_name, consumer_name, limit, nil, 'asc') || [] events.each do |event| pgq_failed_event_delete(queue_name, consumer_name, event['ev_id']) end events.length end |
#pgq_mass_retry_failed_events(queue_name, consumer_name, limit = 5_000) ⇒ Object
112 113 114 115 116 117 118 119 120 |
# File 'lib/pgq/api.rb', line 112 def pgq_mass_retry_failed_events(queue_name, consumer_name, limit = 5_000) events = pgq_failed_event_list(queue_name, consumer_name, limit, nil, 'asc') || [] events.each do |event| pgq_failed_event_retry(queue_name, consumer_name, event['ev_id']) end events.length end |
#pgq_next_batch(queue_name, consumer_name) ⇒ Object
consuming
32 33 34 35 |
# File 'lib/pgq/api.rb', line 32 def pgq_next_batch(queue_name, consumer_name) result = connection.select_value(sanitize_sql_array ["SELECT pgq.next_batch(?, ?)", queue_name, consumer_name]) result ? result.to_i : nil end |
#pgq_register_consumer(queue_name, consumer_name) ⇒ Object
14 15 16 |
# File 'lib/pgq/api.rb', line 14 def pgq_register_consumer(queue_name, consumer_name) connection.select_value(sanitize_sql_array ["SELECT pgq.register_consumer(?, ?)", queue_name, consumer_name]).to_i end |
#pgq_unregister_consumer(queue_name, consumer_name) ⇒ Object
18 19 20 |
# File 'lib/pgq/api.rb', line 18 def pgq_unregister_consumer(queue_name, consumer_name) connection.select_value(sanitize_sql_array ["SELECT pgq.unregister_consumer(?, ?)", queue_name, consumer_name]).to_i end |