Class: Rpush::Daemon::AppRunner
Instance Attribute Summary collapse
Class Method Summary
collapse
Instance Method Summary
collapse
reflect
Methods included from Loggable
log_debug, log_error, log_info, log_warn
pluralize
Constructor Details
#initialize(app) ⇒ AppRunner
Returns a new instance of AppRunner.
94
95
96
97
98
|
# File 'lib/rpush/daemon/app_runner.rb', line 94
def initialize(app)
@app = app
@loops = []
@dispatcher_loops = []
end
|
Instance Attribute Details
#app ⇒ Object
Returns the value of attribute app.
91
92
93
|
# File 'lib/rpush/daemon/app_runner.rb', line 91
def app
@app
end
|
Class Method Details
.app_ids ⇒ Object
57
58
59
|
# File 'lib/rpush/daemon/app_runner.rb', line 57
def self.app_ids
@runners.keys
end
|
.app_running?(app) ⇒ Boolean
53
54
55
|
# File 'lib/rpush/daemon/app_runner.rb', line 53
def self.app_running?(app)
@runners.key?(app.id)
end
|
.app_with_id(app_id) ⇒ Object
49
50
51
|
# File 'lib/rpush/daemon/app_runner.rb', line 49
def self.app_with_id(app_id)
@runners[app_id].app
end
|
.decrement_dispatchers(app, num) ⇒ Object
79
80
81
|
# File 'lib/rpush/daemon/app_runner.rb', line 79
def self.decrement_dispatchers(app, num)
@runners[app.id].decrement_dispatchers(num)
end
|
.enqueue(notifications) ⇒ Object
15
16
17
18
19
20
21
22
|
# File 'lib/rpush/daemon/app_runner.rb', line 15
def self.enqueue(notifications)
notifications.group_by(&:app_id).each do |app_id, group|
start_app_with_id(app_id) unless @runners[app_id]
@runners[app_id].enqueue(group) if @runners[app_id]
end
ProcTitle.update
end
|
.increment_dispatchers(app, num) ⇒ Object
83
84
85
|
# File 'lib/rpush/daemon/app_runner.rb', line 83
def self.increment_dispatchers(app, num)
@runners[app.id].increment_dispatchers(num)
end
|
.num_dispatchers_for_app(app) ⇒ Object
74
75
76
77
|
# File 'lib/rpush/daemon/app_runner.rb', line 74
def self.num_dispatchers_for_app(app)
runner = @runners[app.id]
runner ? runner.num_dispatcher_loops : 0
end
|
.start_app(app) ⇒ Object
28
29
30
31
32
33
34
35
36
37
38
39
|
# File 'lib/rpush/daemon/app_runner.rb', line 28
def self.start_app(app)
Rpush.logger.info("[#{app.name}] Starting #{pluralize(app.connections, 'dispatcher')}... ", true)
runner = @runners[app.id] = new(app)
runner.start_dispatchers
puts Rainbow('✔').green if Rpush.config.foreground && Rpush.config.foreground_logging
runner.start_loops
rescue StandardError => e
@runners.delete(app.id)
Rpush.logger.error("[#{app.name}] Exception raised during startup. Notifications will not be delivered for this app.")
Rpush.logger.error(e)
reflect(:error, e)
end
|
.start_app_with_id(app_id) ⇒ Object
24
25
26
|
# File 'lib/rpush/daemon/app_runner.rb', line 24
def self.start_app_with_id(app_id)
start_app(Rpush::Daemon.store.app(app_id))
end
|
.status ⇒ Object
87
88
89
|
# File 'lib/rpush/daemon/app_runner.rb', line 87
def self.status
{ app_runners: @runners.values.map(&:status) }
end
|
.stop ⇒ Object
61
62
63
64
|
# File 'lib/rpush/daemon/app_runner.rb', line 61
def self.stop
@runners.values.map(&:stop)
@runners.clear
end
|
.stop_app(app_id) ⇒ Object
41
42
43
44
45
46
47
|
# File 'lib/rpush/daemon/app_runner.rb', line 41
def self.stop_app(app_id)
runner = @runners.delete(app_id)
if runner
runner.stop
log_info("[#{runner.app.name}] Stopped.")
end
end
|
.total_dispatchers ⇒ Object
66
67
68
|
# File 'lib/rpush/daemon/app_runner.rb', line 66
def self.total_dispatchers
@runners.values.sum(&:num_dispatcher_loops)
end
|
.total_queued ⇒ Object
70
71
72
|
# File 'lib/rpush/daemon/app_runner.rb', line 70
def self.total_queued
@runners.values.sum(&:queue_size)
end
|
Instance Method Details
#decrement_dispatchers(num) ⇒ Object
135
136
137
|
# File 'lib/rpush/daemon/app_runner.rb', line 135
def decrement_dispatchers(num)
num.times { @dispatcher_loops.pop.stop }
end
|
#enqueue(notifications) ⇒ Object
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
|
# File 'lib/rpush/daemon/app_runner.rb', line 119
def enqueue(notifications)
if service.batch_deliveries?
batch_size = (notifications.size / num_dispatcher_loops.to_f).ceil
notifications.in_groups_of(batch_size, false).each do |batch_notifications|
batch = Batch.new(batch_notifications)
queue.push(QueuePayload.new(batch))
end
else
batch = Batch.new(notifications)
notifications.each do |notification|
queue.push(QueuePayload.new(batch, notification))
reflect(:notification_enqueued, notification)
end
end
end
|
#increment_dispatchers(num) ⇒ Object
139
140
141
|
# File 'lib/rpush/daemon/app_runner.rb', line 139
def increment_dispatchers(num)
num.times { @dispatcher_loops.push(new_dispatcher_loop) }
end
|
#num_dispatcher_loops ⇒ Object
157
158
159
|
# File 'lib/rpush/daemon/app_runner.rb', line 157
def num_dispatcher_loops
@dispatcher_loops.size
end
|
#start_dispatchers ⇒ Object
100
101
102
|
# File 'lib/rpush/daemon/app_runner.rb', line 100
def start_dispatchers
app.connections.times { @dispatcher_loops.push(new_dispatcher_loop) }
end
|
#start_loops ⇒ Object
104
105
106
107
|
# File 'lib/rpush/daemon/app_runner.rb', line 104
def start_loops
@loops = service.loop_instances(@app)
@loops.map(&:start)
end
|
#status ⇒ Object
143
144
145
146
147
148
149
150
151
152
153
154
155
|
# File 'lib/rpush/daemon/app_runner.rb', line 143
def status
dispatcher_details = {}
@dispatcher_loops.each_with_index do |dispatcher_loop, i|
dispatcher_details[i] = {
started_at: dispatcher_loop.started_at.iso8601,
dispatched: dispatcher_loop.dispatch_count,
thread_status: dispatcher_loop.thread_status
}
end
{ app_name: @app.name, dispatchers: dispatcher_details, queued: queue_size }
end
|
#stop ⇒ Object
109
110
111
112
113
|
# File 'lib/rpush/daemon/app_runner.rb', line 109
def stop
wait_until_idle
stop_dispatcher_loops
stop_loops
end
|
#wait_until_idle ⇒ Object
115
116
117
|
# File 'lib/rpush/daemon/app_runner.rb', line 115
def wait_until_idle
sleep 0.5 while queue.size > 0
end
|