Class: RocketSMS::Gateway

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Includes:
Singleton
Defined in:
lib/rocket_sms/gateway.rb

Instance Method Summary collapse

Constructor Details

#initializeGateway

Returns a new instance of Gateway.



9
10
11
12
13
# File 'lib/rocket_sms/gateway.rb', line 9

def initialize
  @scheduler = {}
  @transceivers = {}
  @path = Gem::Specification.find_by_name('rocket_sms').gem_dir
end

Instance Method Details

#clean_up_stale_transceiversObject



76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/rocket_sms/gateway.rb', line 76

def clean_up_stale_transceivers
  redis.keys("gateway:transceivers:*") do |keys|
    op = Proc.new do |key, iter|
      redis.del(key) do |resp|
        iter.next
      end
    end
    cb = Proc.new do |responses|
      setup_transceivers
    end
    EM::Iterator.new(keys).each(op,cb)
  end
end

#log(msg, level = 'info') ⇒ Object



15
16
17
18
19
20
21
# File 'lib/rocket_sms/gateway.rb', line 15

def log(msg, level = 'info')
  if EM.reactor_running?
    EM.defer{ logger.send(level, msg) }
  else
    logger.send(level, msg)
  end
end

#setup_transceiversObject



90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/rocket_sms/gateway.rb', line 90

def setup_transceivers
  # Clean up stale transceivers
  op = Proc.new do |tid, iter|
    tsettings = settings[:transceivers][tid]
    redis.multi
    redis.hset("gateway:transceivers:#{tid}", "throughput", tsettings[:throughput])
    redis.hset("gateway:transceivers:#{tid}", "connection", MultiJson.dump(tsettings[:connection]))
    redis.exec do |resp|
      iter.next
    end
  end
  cb = Proc.new do |responses|
    start_scheduler
    start_transceivers
  end
  EM::Iterator.new(settings[:transceivers].keys).each(op,cb)
end

#shutdownObject



57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/rocket_sms/gateway.rb', line 57

def shutdown
  if @scheduler[:pid]
    Process.kill('TERM', @scheduler[:pid]) rescue nil
  end
  if @transceivers
    @transceivers.each_value do |t|
      if t[:pid]
        Process.kill('TERM', t[:pid]) rescue nil
      end
    end
  end
  log "Gateway DOWN."
  EM.stop
end

#startObject



23
24
25
26
27
28
29
30
31
# File 'lib/rocket_sms/gateway.rb', line 23

def start
  EM.run do
    log "Starting Gateway"
    startup
    # Trap exit-related signals
    Signal.trap("INT") { |signal| stop(signal) }
    Signal.trap("TERM") { |signal| stop(signal) }
  end
end

#start_schedulerObject



108
109
110
111
# File 'lib/rocket_sms/gateway.rb', line 108

def start_scheduler
  cmd = "bundle exec ruby #{@path}/bin/scheduler_runner.rb"
  @scheduler[:pid] = Process.spawn({ "REDIS_URL" => redis_url, "LOG_LOCATION" => (log_location == STDOUT ? nil : log_location) }, cmd)
end

#start_transceiversObject



113
114
115
116
117
118
119
# File 'lib/rocket_sms/gateway.rb', line 113

def start_transceivers
  settings[:transceivers].each do |tid, settings|
    cmd = "bundle exec ruby #{@path}/bin/transceiver_runner.rb"
    @transceivers[tid] = {}
    @transceivers[tid][:pid] = Process.spawn({ "TRANSCEIVER_ID" => tid.to_s ,"REDIS_URL" => redis_url, "LOG_LOCATION" => (log_location == STDOUT ? nil : log_location) }, cmd)
  end
end

#startupObject



72
73
74
# File 'lib/rocket_sms/gateway.rb', line 72

def startup
  clean_up_stale_transceivers
end

#stop(signal = nil) ⇒ Object



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/rocket_sms/gateway.rb', line 33

def stop(signal = nil)
  if @kill
    log "Forcing Exit. Check your data for losses."
    shutdown
  else
    log "Stopping. Waiting 5 seconds for pending operations to finish."
    @kill = true
    @active = false
    if @scheduler[:pid]
      Process.kill('TERM', @scheduler[:pid]) rescue nil
      Process.wait(@scheduler[:pid]) 
    end
    if @transceivers
      @transceivers.each_value do |t|
        if t[:pid] 
          Process.kill('TERM', t[:pid]) rescue nil
          Process.wait(t[:pid])
        end
      end
    end
    EM::Timer.new(5){ shutdown }
  end
end