Class: Rpush::Daemon::AppRunner

Inherits:
Object
  • Object
show all
Extended by:
Loggable, Reflectable, StringHelpers, Term::ANSIColor
Includes:
Loggable, Reflectable, StringHelpers
Defined in:
lib/rpush/daemon/app_runner.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Reflectable

reflect

Methods included from Loggable

log_error, log_info, log_warn

Methods included from StringHelpers

pluralize

Constructor Details

#initialize(app) ⇒ AppRunner

Returns a new instance of AppRunner.



94
95
96
97
98
# File 'lib/rpush/daemon/app_runner.rb', line 94

def initialize(app)
  @app = app
  @loops = []
  @dispatcher_loops = []
end

Instance Attribute Details

#appObject (readonly)

Returns the value of attribute app.



92
93
94
# File 'lib/rpush/daemon/app_runner.rb', line 92

def app
  @app
end

Class Method Details

.app_idsObject



58
59
60
# File 'lib/rpush/daemon/app_runner.rb', line 58

def self.app_ids
  @runners.keys
end

.app_running?(app) ⇒ Boolean

Returns:

  • (Boolean)


54
55
56
# File 'lib/rpush/daemon/app_runner.rb', line 54

def self.app_running?(app)
  @runners.key?(app.id)
end

.app_with_id(app_id) ⇒ Object



50
51
52
# File 'lib/rpush/daemon/app_runner.rb', line 50

def self.app_with_id(app_id)
  @runners[app_id].app
end

.debugObject



88
89
90
# File 'lib/rpush/daemon/app_runner.rb', line 88

def self.debug
  @runners.values.map(&:debug)
end

.decrement_dispatchers(app, num) ⇒ Object



80
81
82
# File 'lib/rpush/daemon/app_runner.rb', line 80

def self.decrement_dispatchers(app, num)
  @runners[app.id].decrement_dispatchers(num)
end

.enqueue(notifications) ⇒ Object



17
18
19
20
21
22
23
24
# File 'lib/rpush/daemon/app_runner.rb', line 17

def self.enqueue(notifications)
  notifications.group_by(&:app_id).each do |app_id, group|
    start_app_with_id(app_id) unless @runners[app_id]
    @runners[app_id].enqueue(group) if @runners[app_id]
  end

  ProcTitle.update
end

.increment_dispatchers(app, num) ⇒ Object



84
85
86
# File 'lib/rpush/daemon/app_runner.rb', line 84

def self.increment_dispatchers(app, num)
  @runners[app.id].increment_dispatchers(num)
end

.num_dispatchers_for_app(app) ⇒ Object



75
76
77
78
# File 'lib/rpush/daemon/app_runner.rb', line 75

def self.num_dispatchers_for_app(app)
  runner = @runners[app.id]
  runner ? runner.num_dispatcher_loops : 0
end

.start_app(app) ⇒ Object



30
31
32
33
34
35
36
37
38
39
40
# File 'lib/rpush/daemon/app_runner.rb', line 30

def self.start_app(app)
  Rpush.logger.info("[#{app.name}] Starting #{pluralize(app.connections, 'dispatcher')}... ", true)
  @runners[app.id] = new(app)
  @runners[app.id].start
  puts green('') if Rpush.config.foreground
rescue StandardError => e
  @runners.delete(app.id)
  Rpush.logger.error("[#{app.name}] Exception raised during startup. Notifications will not be delivered for this app.")
  Rpush.logger.error(e)
  reflect(:error, e)
end

.start_app_with_id(app_id) ⇒ Object



26
27
28
# File 'lib/rpush/daemon/app_runner.rb', line 26

def self.start_app_with_id(app_id)
  start_app(Rpush::Daemon.store.app(app_id))
end

.stopObject



62
63
64
65
# File 'lib/rpush/daemon/app_runner.rb', line 62

def self.stop
  @runners.values.map(&:stop)
  @runners.clear
end

.stop_app(app_id) ⇒ Object



42
43
44
45
46
47
48
# File 'lib/rpush/daemon/app_runner.rb', line 42

def self.stop_app(app_id)
  runner = @runners.delete(app_id)
  if runner
    runner.stop
    log_info("[#{runner.app.name}] Stopped.")
  end
end

.total_dispatchersObject



67
68
69
# File 'lib/rpush/daemon/app_runner.rb', line 67

def self.total_dispatchers
  @runners.values.sum(&:num_dispatcher_loops)
end

.total_queuedObject



71
72
73
# File 'lib/rpush/daemon/app_runner.rb', line 71

def self.total_queued
  @runners.values.sum(&:queue_size)
end

Instance Method Details

#debugObject



139
140
141
142
143
144
145
146
147
148
149
150
151
152
# File 'lib/rpush/daemon/app_runner.rb', line 139

def debug
  dispatcher_details = {}

  @dispatcher_loops.each_with_index do |dispatcher_loop, i|
    dispatcher_details[i] = {
      started_at: dispatcher_loop.started_at.iso8601,
      dispatched: dispatcher_loop.dispatch_count,
      thread_status: dispatcher_loop.thread_status
    }
  end

  runner_details = { dispatchers: dispatcher_details, queued: queue_size }
  log_info(JSON.pretty_generate(runner_details))
end

#decrement_dispatchers(num) ⇒ Object



131
132
133
# File 'lib/rpush/daemon/app_runner.rb', line 131

def decrement_dispatchers(num)
  num.times { @dispatcher_loops.pop.stop }
end

#enqueue(notifications) ⇒ Object



115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/rpush/daemon/app_runner.rb', line 115

def enqueue(notifications)
  if service.batch_deliveries?
    batch_size = (notifications.size / num_dispatcher_loops.to_f).ceil
    notifications.in_groups_of(batch_size, false).each do |batch_notifications|
      batch = Batch.new(batch_notifications)
      queue.push(QueuePayload.new(batch))
    end
  else
    batch = Batch.new(notifications)
    notifications.each do |notification|
      queue.push(QueuePayload.new(batch, notification))
      reflect(:notification_enqueued, notification)
    end
  end
end

#increment_dispatchers(num) ⇒ Object



135
136
137
# File 'lib/rpush/daemon/app_runner.rb', line 135

def increment_dispatchers(num)
  num.times { @dispatcher_loops.push(new_dispatcher_loop) }
end

#num_dispatcher_loopsObject



158
159
160
# File 'lib/rpush/daemon/app_runner.rb', line 158

def num_dispatcher_loops
  @dispatcher_loops.size
end

#queue_sizeObject



154
155
156
# File 'lib/rpush/daemon/app_runner.rb', line 154

def queue_size
  queue.size
end

#startObject



100
101
102
103
# File 'lib/rpush/daemon/app_runner.rb', line 100

def start
  app.connections.times { @dispatcher_loops.push(new_dispatcher_loop) }
  start_loops
end

#stopObject



105
106
107
108
109
# File 'lib/rpush/daemon/app_runner.rb', line 105

def stop
  wait_until_idle
  stop_dispatcher_loops
  stop_loops
end

#wait_until_idleObject



111
112
113
# File 'lib/rpush/daemon/app_runner.rb', line 111

def wait_until_idle
  sleep 0.5 while queue.size > 0
end