Class: Pgq::ConsumerGroup
- Inherits:
-
Consumer
- Object
- ConsumerBase
- Consumer
- Pgq::ConsumerGroup
- Defined in:
- lib/pgq/consumer_group.rb
Instance Attribute Summary
Attributes inherited from ConsumerBase
#consumer_name, #logger, #queue_name
Instance Method Summary collapse
- #perform_events(events) ⇒ Object
-
#perform_group(events_hash) ⇒ Object
=> [events].
- #sum_events(events) ⇒ Object
Methods inherited from Consumer
add_event, method_missing, #perform
Methods inherited from ConsumerBase
#all_events_failed, #coder, coder, #connection, connection, consumer_name, #database, database, enqueue, #event_failed, #event_retry, extract_queue_name, #finish_batch, #get_batch_events, inherited, #initialize, #log_error, #log_info, next_queue_name, #perform, #perform_batch, #perform_event, queue_name, set_queue_name
Methods included from Utils
#add_queue, #delete_failed_events, #inspect_londiste_queue, #inspect_queue, #inspect_self_queue, #proxy, #queues_list, #remove_queue, #retry_failed_events
Constructor Details
This class inherits a constructor from Pgq::ConsumerBase
Instance Method Details
#perform_events(events) ⇒ Object
13 14 15 16 17 |
# File 'lib/pgq/consumer_group.rb', line 13 def perform_events(events) events = sum_events(events) # log_info "consume events (#{self.queue_name}): #{events.map{|k,v| [k, v.size]}.inspect}" perform_group(events) if events.present? end |
#perform_group(events_hash) ⇒ Object
=> [events]
9 10 11 |
# File 'lib/pgq/consumer_group.rb', line 9 def perform_group(events_hash) raise "realize me" end |
#sum_events(events) ⇒ Object
19 20 21 22 23 24 25 |
# File 'lib/pgq/consumer_group.rb', line 19 def sum_events(events) events.inject({}) do |result, event| result[event.type] ||= [] result[event.type] << event result end end |