Module: Pgq::Utils
- Included in:
- ConsumerBase
- Defined in:
- lib/pgq/utils.rb
Instance Method Summary collapse
-
#add_queue(queue_name, consumer_name = self.consumer_name) ⇒ Object
methods for migrations.
- #delete_failed_events(queue_name, limit = 5_000) ⇒ Object
-
#inspect_londiste_queue(queue_name) ⇒ Object
show hash stats, for londiste type of storage events { type => events_count }.
-
#inspect_queue(queue_name) ⇒ Object
inspect queue { type => events_count }.
- #inspect_self_queue ⇒ Object
-
#proxy(method_name) ⇒ Object
proxing method for tests.
-
#queues_list ⇒ Object
all queues for database.
- #remove_queue(queue_name, consumer_name = self.consumer_name) ⇒ Object
-
#retry_failed_events(queue_name, limit = 5_000) ⇒ Object
resend failed events in queue.
Instance Method Details
#add_queue(queue_name, consumer_name = self.consumer_name) ⇒ Object
methods for migrations
9 10 11 12 |
# File 'lib/pgq/utils.rb', line 9 def add_queue(queue_name, consumer_name = self.consumer_name) database.pgq_create_queue(queue_name.to_s) database.pgq_register_consumer(queue_name.to_s, consumer_name.to_s) end |
#delete_failed_events(queue_name, limit = 5_000) ⇒ Object
91 92 93 |
# File 'lib/pgq/utils.rb', line 91 def delete_failed_events(queue_name, limit = 5_000) database.pgq_mass_delete_failed_events(queue_name, self.consumer_name, limit) end |
#inspect_londiste_queue(queue_name) ⇒ Object
show hash stats, for londiste type of storage events { type => events_count }
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/pgq/utils.rb', line 52 def inspect_londiste_queue(queue_name) table, last_event = database.pgq_last_event_id(queue_name) stats = if last_event connection.select_all <<-SQL SELECT count(*) as count, ev_type, ev_extra1 FROM #{table} WHERE ev_id > #{last_event.to_i} GROUP BY ev_type, ev_extra1 SQL else connection.select_all <<-SQL SELECT ev_type, ev_extra1 FROM #{table} GROUP BY ev_type, ev_extra1 ORDER BY ev_extra1, ev_type SQL end stats.each do |x| result["#{x['ev_extra1']}:#{x['ev_type']}"] = x['count'].to_i end result end |
#inspect_queue(queue_name) ⇒ Object
inspect queue
{ type => events_count }
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/pgq/utils.rb', line 21 def inspect_queue(queue_name) table, last_event = database.pgq_last_event_id(queue_name) stats = if last_event connection.select_all <<-SQL SELECT count(*) as count, ev_type FROM #{table} WHERE ev_id > #{last_event.to_i} GROUP BY ev_type SQL else connection.select_all <<-SQL SELECT ev_type FROM #{table} GROUP BY ev_type SQL end stats.each do |x| result["#{x['ev_type']}"] = x['count'].to_i end result end |
#inspect_self_queue ⇒ Object
46 47 48 |
# File 'lib/pgq/utils.rb', line 46 def inspect_self_queue self.inspect_queue(self.queue_name) end |
#proxy(method_name) ⇒ Object
proxing method for tests
79 80 81 82 83 84 |
# File 'lib/pgq/utils.rb', line 79 def proxy(method_name) self.should_receive(:enqueue) do |method_name, *data| x = self.coder.load(self.coder.dump(data)) self.new.send(:perform, method_name, *x) end.any_number_of_times end |
#queues_list ⇒ Object
all queues for database
4 5 6 |
# File 'lib/pgq/utils.rb', line 4 def queues_list database.pgq_get_consumer_info.map{|x| x['queue_name']}.uniq end |
#remove_queue(queue_name, consumer_name = self.consumer_name) ⇒ Object
14 15 16 17 |
# File 'lib/pgq/utils.rb', line 14 def remove_queue(queue_name, consumer_name = self.consumer_name) database.pgq_unregister_consumer(queue_name.to_s, consumer_name.to_s) database.pgq_drop_queue(queue_name.to_s) end |
#retry_failed_events(queue_name, limit = 5_000) ⇒ Object
resend failed events in queue
87 88 89 |
# File 'lib/pgq/utils.rb', line 87 def retry_failed_events(queue_name, limit = 5_000) database.pgq_mass_retry_failed_events(queue_name, self.consumer_name, limit) end |