Class: Rpush::Daemon::AppRunner
Instance Attribute Summary collapse
Class Method Summary
collapse
Instance Method Summary
collapse
reflect
Methods included from Loggable
log_error, log_info, log_warn
pluralize
Constructor Details
#initialize(app) ⇒ AppRunner
Returns a new instance of AppRunner.
94
95
96
97
98
|
# File 'lib/rpush/daemon/app_runner.rb', line 94
def initialize(app)
@app = app
@loops = []
@dispatcher_loops = []
end
|
Instance Attribute Details
#app ⇒ Object
Returns the value of attribute app.
92
93
94
|
# File 'lib/rpush/daemon/app_runner.rb', line 92
def app
@app
end
|
Class Method Details
.app_ids ⇒ Object
58
59
60
|
# File 'lib/rpush/daemon/app_runner.rb', line 58
def self.app_ids
@runners.keys
end
|
.app_running?(app) ⇒ Boolean
54
55
56
|
# File 'lib/rpush/daemon/app_runner.rb', line 54
def self.app_running?(app)
@runners.key?(app.id)
end
|
.app_with_id(app_id) ⇒ Object
50
51
52
|
# File 'lib/rpush/daemon/app_runner.rb', line 50
def self.app_with_id(app_id)
@runners[app_id].app
end
|
.debug ⇒ Object
88
89
90
|
# File 'lib/rpush/daemon/app_runner.rb', line 88
def self.debug
@runners.values.map(&:debug)
end
|
.decrement_dispatchers(app, num) ⇒ Object
80
81
82
|
# File 'lib/rpush/daemon/app_runner.rb', line 80
def self.decrement_dispatchers(app, num)
@runners[app.id].decrement_dispatchers(num)
end
|
.enqueue(notifications) ⇒ Object
17
18
19
20
21
22
23
24
|
# File 'lib/rpush/daemon/app_runner.rb', line 17
def self.enqueue(notifications)
notifications.group_by(&:app_id).each do |app_id, group|
start_app_with_id(app_id) unless @runners[app_id]
@runners[app_id].enqueue(group) if @runners[app_id]
end
ProcTitle.update
end
|
.increment_dispatchers(app, num) ⇒ Object
84
85
86
|
# File 'lib/rpush/daemon/app_runner.rb', line 84
def self.increment_dispatchers(app, num)
@runners[app.id].increment_dispatchers(num)
end
|
.num_dispatchers_for_app(app) ⇒ Object
75
76
77
78
|
# File 'lib/rpush/daemon/app_runner.rb', line 75
def self.num_dispatchers_for_app(app)
runner = @runners[app.id]
runner ? runner.num_dispatcher_loops : 0
end
|
.start_app(app) ⇒ Object
30
31
32
33
34
35
36
37
38
39
40
|
# File 'lib/rpush/daemon/app_runner.rb', line 30
def self.start_app(app)
Rpush.logger.info("[#{app.name}] Starting #{pluralize(app.connections, 'dispatcher')}... ", true)
@runners[app.id] = new(app)
@runners[app.id].start
puts green('✔') if Rpush.config.foreground
rescue StandardError => e
@runners.delete(app.id)
Rpush.logger.error("[#{app.name}] Exception raised during startup. Notifications will not be delivered for this app.")
Rpush.logger.error(e)
reflect(:error, e)
end
|
.start_app_with_id(app_id) ⇒ Object
26
27
28
|
# File 'lib/rpush/daemon/app_runner.rb', line 26
def self.start_app_with_id(app_id)
start_app(Rpush::Daemon.store.app(app_id))
end
|
.stop ⇒ Object
62
63
64
65
|
# File 'lib/rpush/daemon/app_runner.rb', line 62
def self.stop
@runners.values.map(&:stop)
@runners.clear
end
|
.stop_app(app_id) ⇒ Object
42
43
44
45
46
47
48
|
# File 'lib/rpush/daemon/app_runner.rb', line 42
def self.stop_app(app_id)
runner = @runners.delete(app_id)
if runner
runner.stop
log_info("[#{runner.app.name}] Stopped.")
end
end
|
.total_dispatchers ⇒ Object
67
68
69
|
# File 'lib/rpush/daemon/app_runner.rb', line 67
def self.total_dispatchers
@runners.values.sum(&:num_dispatcher_loops)
end
|
.total_queued ⇒ Object
71
72
73
|
# File 'lib/rpush/daemon/app_runner.rb', line 71
def self.total_queued
@runners.values.sum(&:queue_size)
end
|
Instance Method Details
#debug ⇒ Object
139
140
141
142
143
144
145
146
147
148
149
150
151
152
|
# File 'lib/rpush/daemon/app_runner.rb', line 139
def debug
dispatcher_details = {}
@dispatcher_loops.each_with_index do |dispatcher_loop, i|
dispatcher_details[i] = {
started_at: dispatcher_loop.started_at.iso8601,
dispatched: dispatcher_loop.dispatch_count,
thread_status: dispatcher_loop.thread_status
}
end
runner_details = { dispatchers: dispatcher_details, queued: queue_size }
log_info(JSON.pretty_generate(runner_details))
end
|
#decrement_dispatchers(num) ⇒ Object
131
132
133
|
# File 'lib/rpush/daemon/app_runner.rb', line 131
def decrement_dispatchers(num)
num.times { @dispatcher_loops.pop.stop }
end
|
#enqueue(notifications) ⇒ Object
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
|
# File 'lib/rpush/daemon/app_runner.rb', line 115
def enqueue(notifications)
if service.batch_deliveries?
batch_size = (notifications.size / num_dispatcher_loops.to_f).ceil
notifications.in_groups_of(batch_size, false).each do |batch_notifications|
batch = Batch.new(batch_notifications)
queue.push(QueuePayload.new(batch))
end
else
batch = Batch.new(notifications)
notifications.each do |notification|
queue.push(QueuePayload.new(batch, notification))
reflect(:notification_enqueued, notification)
end
end
end
|
#increment_dispatchers(num) ⇒ Object
135
136
137
|
# File 'lib/rpush/daemon/app_runner.rb', line 135
def increment_dispatchers(num)
num.times { @dispatcher_loops.push(new_dispatcher_loop) }
end
|
#num_dispatcher_loops ⇒ Object
158
159
160
|
# File 'lib/rpush/daemon/app_runner.rb', line 158
def num_dispatcher_loops
@dispatcher_loops.size
end
|
#queue_size ⇒ Object
154
155
156
|
# File 'lib/rpush/daemon/app_runner.rb', line 154
def queue_size
queue.size
end
|
#start ⇒ Object
100
101
102
103
|
# File 'lib/rpush/daemon/app_runner.rb', line 100
def start
app.connections.times { @dispatcher_loops.push(new_dispatcher_loop) }
start_loops
end
|
#stop ⇒ Object
105
106
107
108
109
|
# File 'lib/rpush/daemon/app_runner.rb', line 105
def stop
wait_until_idle
stop_dispatcher_loops
stop_loops
end
|
#wait_until_idle ⇒ Object
111
112
113
|
# File 'lib/rpush/daemon/app_runner.rb', line 111
def wait_until_idle
sleep 0.5 while queue.size > 0
end
|