Class: Pgq::ConsumerBase

Inherits:
Object
  • Object
show all
Extended by:
Utils
Defined in:
lib/pgq/consumer_base.rb

Direct Known Subclasses

Consumer

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Utils

add_queue, delete_failed_events, inspect_londiste_queue, inspect_queue, inspect_self_queue, proxy, queues_list, remove_queue, retry_failed_events

Constructor Details

#initialize(logger = nil, custom_queue_name = nil, custom_consumer_name = nil) ⇒ ConsumerBase

consumer part



78
79
80
81
82
83
# File 'lib/pgq/consumer_base.rb', line 78

def initialize(logger = nil, custom_queue_name = nil, custom_consumer_name = nil)
  self.queue_name = custom_queue_name || self.class.queue_name
  self.consumer_name = custom_consumer_name || self.class.consumer_name
  self.logger = logger
  @batch_id = nil
end

Instance Attribute Details

#consumer_nameObject

Returns the value of attribute consumer_name.



11
12
13
# File 'lib/pgq/consumer_base.rb', line 11

def consumer_name
  @consumer_name
end

#loggerObject

Returns the value of attribute logger.



11
12
13
# File 'lib/pgq/consumer_base.rb', line 11

def logger
  @logger
end

#queue_nameObject

Returns the value of attribute queue_name.



11
12
13
# File 'lib/pgq/consumer_base.rb', line 11

def queue_name
  @queue_name
end

Class Method Details

.coderObject

coder



62
63
64
# File 'lib/pgq/consumer_base.rb', line 62

def self.coder
  Marshal64
end

.connectionObject



23
24
25
# File 'lib/pgq/consumer_base.rb', line 23

def self.connection
  database.connection
end

.consumer_nameObject



47
48
49
# File 'lib/pgq/consumer_base.rb', line 47

def self.consumer_name
  @consumer_name
end

.databaseObject

connection



15
16
17
# File 'lib/pgq/consumer_base.rb', line 15

def self.database
  ActiveRecord::Base # can redefine
end

.enqueue(method_name, *args) ⇒ Object

insert event



72
73
74
# File 'lib/pgq/consumer_base.rb', line 72

def self.enqueue(method_name, *args)
  self.database.pgq_insert_event( self.next_queue_name, method_name.to_s, coder.dump(args) )
end

.extract_queue_nameObject

queue name



33
34
35
# File 'lib/pgq/consumer_base.rb', line 33

def self.extract_queue_name
  self.name.to_s.gsub(/^pgq/i, '').underscore.gsub('/', '-')
end

.inherited(subclass) ⇒ Object

magic set queue_name from class name



42
43
44
45
# File 'lib/pgq/consumer_base.rb', line 42

def self.inherited(subclass)
  subclass.set_queue_name(subclass.extract_queue_name)
  subclass.instance_variable_set('@consumer_name', self.consumer_name)
end

.next_queue_nameObject

this method used when insert event, possible to reuse



56
57
58
# File 'lib/pgq/consumer_base.rb', line 56

def self.next_queue_name
  self.queue_name
end

.queue_nameObject



51
52
53
# File 'lib/pgq/consumer_base.rb', line 51

def self.queue_name
  @queue_name
end

.set_queue_name(name) ⇒ Object



37
38
39
# File 'lib/pgq/consumer_base.rb', line 37

def self.set_queue_name(name)
  @queue_name = name.to_s
end

Instance Method Details

#all_events_failed(events, ex) ⇒ Object



154
155
156
157
158
159
160
# File 'lib/pgq/consumer_base.rb', line 154

def all_events_failed(events, ex)
  log_error(Pgq::Event.exception_message(ex))
  
  events.each do |event|
    event.failed!(ex)
  end    
end

#coderObject



66
67
68
# File 'lib/pgq/consumer_base.rb', line 66

def coder
  self.class.coder
end

#connectionObject



27
28
29
# File 'lib/pgq/consumer_base.rb', line 27

def connection
  self.class.connection
end

#databaseObject



19
20
21
# File 'lib/pgq/consumer_base.rb', line 19

def database
  self.class.database
end

#event_failed(event_id, reason) ⇒ Object



146
147
148
# File 'lib/pgq/consumer_base.rb', line 146

def event_failed(event_id, reason)
  database.pgq_event_failed(@batch_id, event_id, reason)
end

#event_retry(event_id, seconds = 0) ⇒ Object



150
151
152
# File 'lib/pgq/consumer_base.rb', line 150

def event_retry(event_id, seconds = 0)
  database.pgq_event_retry(@batch_id, event_id, seconds)
end

#finish_batch(count = nil) ⇒ Object



140
141
142
143
144
# File 'lib/pgq/consumer_base.rb', line 140

def finish_batch(count = nil)
  return unless @batch_id
  database.pgq_finish_batch(@batch_id)
  @batch_id = nil
end

#get_batch_eventsObject



134
135
136
137
138
# File 'lib/pgq/consumer_base.rb', line 134

def get_batch_events
  @batch_id = database.pgq_next_batch(queue_name, consumer_name)
  return nil if !@batch_id
  database.pgq_get_batch_events(@batch_id)
end

#log_error(mes) ⇒ Object



168
169
170
# File 'lib/pgq/consumer_base.rb', line 168

def log_error(mes)
  @logger.error(mes) if @logger
end

#log_info(mes) ⇒ Object

log methods



164
165
166
# File 'lib/pgq/consumer_base.rb', line 164

def log_info(mes)
  @logger.info(mes) if @logger
end

#perform(type, *data) ⇒ Object



130
131
132
# File 'lib/pgq/consumer_base.rb', line 130

def perform(type, *data)
  raise "realize me"    
end

#perform_batchObject



85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/pgq/consumer_base.rb', line 85

def perform_batch
  events = []
  pgq_events = get_batch_events

  return 0 if pgq_events.blank?
  
  events = pgq_events.map{|ev| Pgq::Event.new(self, ev) }
  size = events.size
  log_info "=> batch(#{queue_name}): events #{size}"
  
  perform_events(events)
  
rescue Exception => ex
  all_events_failed(events, ex)
 
rescue => ex
  all_events_failed(events, ex)
  
ensure
  finish_batch(events.size)
  
  return events.size
end

#perform_event(event) ⇒ Object



115
116
117
118
119
120
121
122
123
124
125
126
127
128
# File 'lib/pgq/consumer_base.rb', line 115

def perform_event(event)
  type = event.type
  data = event.data

  perform(type, *data)

rescue Exception => ex
  self.log_error(event.exception_message(ex))
  event.failed!(ex)
  
rescue => ex
  self.log_error(event.exception_message(ex))
  event.failed!(ex)
end

#perform_events(events) ⇒ Object



109
110
111
112
113
# File 'lib/pgq/consumer_base.rb', line 109

def perform_events(events)
  events.each do |event|
    perform_event(event)
  end
end