Class: Pgq::ConsumerBase
- Inherits:
-
Object
show all
- Extended by:
- Utils
- Defined in:
- lib/pgq/consumer_base.rb
Instance Attribute Summary collapse
Class Method Summary
collapse
Instance Method Summary
collapse
Methods included from Utils
add_queue, delete_failed_events, inspect_londiste_queue, inspect_queue, inspect_self_queue, proxy, queues_list, remove_queue, retry_failed_events
Constructor Details
#initialize(logger = nil, custom_queue_name = nil, custom_consumer_name = nil) ⇒ ConsumerBase
78
79
80
81
82
83
|
# File 'lib/pgq/consumer_base.rb', line 78
def initialize(logger = nil, custom_queue_name = nil, custom_consumer_name = nil)
self.queue_name = custom_queue_name || self.class.queue_name
self.consumer_name = custom_consumer_name || self.class.consumer_name
self.logger = logger
@batch_id = nil
end
|
Instance Attribute Details
#consumer_name ⇒ Object
Returns the value of attribute consumer_name.
11
12
13
|
# File 'lib/pgq/consumer_base.rb', line 11
def consumer_name
@consumer_name
end
|
#logger ⇒ Object
Returns the value of attribute logger.
11
12
13
|
# File 'lib/pgq/consumer_base.rb', line 11
def logger
@logger
end
|
#queue_name ⇒ Object
Returns the value of attribute queue_name.
11
12
13
|
# File 'lib/pgq/consumer_base.rb', line 11
def queue_name
@queue_name
end
|
Class Method Details
.coder ⇒ Object
62
63
64
|
# File 'lib/pgq/consumer_base.rb', line 62
def self.coder
Marshal64
end
|
.connection ⇒ Object
23
24
25
|
# File 'lib/pgq/consumer_base.rb', line 23
def self.connection
database.connection
end
|
.consumer_name ⇒ Object
47
48
49
|
# File 'lib/pgq/consumer_base.rb', line 47
def self.consumer_name
@consumer_name
end
|
.database ⇒ Object
15
16
17
|
# File 'lib/pgq/consumer_base.rb', line 15
def self.database
ActiveRecord::Base end
|
.enqueue(method_name, *args) ⇒ Object
72
73
74
|
# File 'lib/pgq/consumer_base.rb', line 72
def self.enqueue(method_name, *args)
self.database.pgq_insert_event( self.next_queue_name, method_name.to_s, coder.dump(args) )
end
|
33
34
35
|
# File 'lib/pgq/consumer_base.rb', line 33
def self.
self.name.to_s.gsub(/^pgq/i, '').underscore.gsub('/', '-')
end
|
.inherited(subclass) ⇒ Object
magic set queue_name from class name
42
43
44
45
|
# File 'lib/pgq/consumer_base.rb', line 42
def self.inherited(subclass)
subclass.set_queue_name(subclass.)
subclass.instance_variable_set('@consumer_name', self.consumer_name)
end
|
.next_queue_name ⇒ Object
this method used when insert event, possible to reuse
56
57
58
|
# File 'lib/pgq/consumer_base.rb', line 56
def self.next_queue_name
self.queue_name
end
|
.queue_name ⇒ Object
51
52
53
|
# File 'lib/pgq/consumer_base.rb', line 51
def self.queue_name
@queue_name
end
|
.set_queue_name(name) ⇒ Object
37
38
39
|
# File 'lib/pgq/consumer_base.rb', line 37
def self.set_queue_name(name)
@queue_name = name.to_s
end
|
Instance Method Details
#all_events_failed(events, ex) ⇒ Object
154
155
156
157
158
159
160
|
# File 'lib/pgq/consumer_base.rb', line 154
def all_events_failed(events, ex)
log_error(Pgq::Event.exception_message(ex))
events.each do |event|
event.failed!(ex)
end
end
|
#coder ⇒ Object
66
67
68
|
# File 'lib/pgq/consumer_base.rb', line 66
def coder
self.class.coder
end
|
#connection ⇒ Object
27
28
29
|
# File 'lib/pgq/consumer_base.rb', line 27
def connection
self.class.connection
end
|
#database ⇒ Object
19
20
21
|
# File 'lib/pgq/consumer_base.rb', line 19
def database
self.class.database
end
|
#event_failed(event_id, reason) ⇒ Object
146
147
148
|
# File 'lib/pgq/consumer_base.rb', line 146
def event_failed(event_id, reason)
database.pgq_event_failed(@batch_id, event_id, reason)
end
|
#event_retry(event_id, seconds = 0) ⇒ Object
150
151
152
|
# File 'lib/pgq/consumer_base.rb', line 150
def event_retry(event_id, seconds = 0)
database.pgq_event_retry(@batch_id, event_id, seconds)
end
|
#finish_batch(count = nil) ⇒ Object
140
141
142
143
144
|
# File 'lib/pgq/consumer_base.rb', line 140
def finish_batch(count = nil)
return unless @batch_id
database.pgq_finish_batch(@batch_id)
@batch_id = nil
end
|
#get_batch_events ⇒ Object
134
135
136
137
138
|
# File 'lib/pgq/consumer_base.rb', line 134
def get_batch_events
@batch_id = database.pgq_next_batch(queue_name, consumer_name)
return nil if !@batch_id
database.pgq_get_batch_events(@batch_id)
end
|
#log_error(mes) ⇒ Object
168
169
170
|
# File 'lib/pgq/consumer_base.rb', line 168
def log_error(mes)
@logger.error(mes) if @logger
end
|
#log_info(mes) ⇒ Object
164
165
166
|
# File 'lib/pgq/consumer_base.rb', line 164
def log_info(mes)
@logger.info(mes) if @logger
end
|
130
131
132
|
# File 'lib/pgq/consumer_base.rb', line 130
def perform(type, *data)
raise "realize me"
end
|
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
|
# File 'lib/pgq/consumer_base.rb', line 85
def perform_batch
events = []
pgq_events = get_batch_events
return 0 if pgq_events.blank?
events = pgq_events.map{|ev| Pgq::Event.new(self, ev) }
size = events.size
log_info "=> batch(#{queue_name}): events #{size}"
perform_events(events)
rescue Exception => ex
all_events_failed(events, ex)
rescue => ex
all_events_failed(events, ex)
ensure
finish_batch(events.size)
return events.size
end
|
115
116
117
118
119
120
121
122
123
124
125
126
127
128
|
# File 'lib/pgq/consumer_base.rb', line 115
def perform_event(event)
type = event.type
data = event.data
perform(type, *data)
rescue Exception => ex
self.log_error(event.exception_message(ex))
event.failed!(ex)
rescue => ex
self.log_error(event.exception_message(ex))
event.failed!(ex)
end
|
109
110
111
112
113
|
# File 'lib/pgq/consumer_base.rb', line 109
def perform_events(events)
events.each do |event|
perform_event(event)
end
end
|