Class: RocketSMS::Transceiver

Inherits:
Object
  • Object
show all
Defined in:
lib/rocket_sms/transceiver.rb

Instance Method Summary collapse

Constructor Details

#initialize(id, redis_url, log_location) ⇒ Transceiver

Returns a new instance of Transceiver.



5
6
7
8
9
10
11
12
# File 'lib/rocket_sms/transceiver.rb', line 5

def initialize(id, redis_url, log_location)
  @id, @redis_url, @log_location = id, redis_url, log_location
  @active = true
  @online = false
  @fast = false
  @settings = {}
  @mts = {}
end

Instance Method Details

#bound(transceiver) ⇒ Object



240
241
242
243
244
245
# File 'lib/rocket_sms/transceiver.rb', line 240

def bound(transceiver)
  log "#{@id} - Transceiver Bound"
  @online = true
  @reconnector = nil
  register
end

#cleanupObject



83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/rocket_sms/transceiver.rb', line 83

def cleanup
  redis.zrangebyscore("gateway:transceivers:#{@id}:dispatch", '-inf', '+inf') do |payloads|
    if payloads and !payloads.empty?
      op = Proc.new do |payload, iter|
        message = Message.from_json(payload)
        message.send_at, message.expires_at = nil, nil
        score = (Time.now.to_f*1000).to_i
        redis.multi
        redis.zrem("gateway:transceivers:#{@id}:dispatch", payload)
        redis.zadd(queues[:mt][:pending], score, payload)
        redis.exec do |resp|
          iter.next
        end
      end
      cb = Proc.new do |responses|
        EM::Timer.new(3){ shutdown }
      end
      EM::Iterator.new(payloads).each(op,cb)
    else
      EM::Timer.new(3){ shutdown }
    end
  end
end

#configureObject



107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# File 'lib/rocket_sms/transceiver.rb', line 107

def configure
  return unless @active
  @configurator = EM::Timer.new(1){ configure }
  redis.multi
  redis.hget("gateway:transceivers:#{@id}", "throughput")
  redis.hget("gateway:transceivers:#{@id}", "connection")
  redis.exec do |response|
    if response and !response.flatten.include?(nil)
      throughput_payload = response[0]
      connection_payload = response[1]
      @settings[:throughput] = throughput_payload.to_f
      @settings[:connection] = MultiJson.load(connection_payload, symbolize_keys: true)
    else
      stop
    end
  end
end

#connectObject



125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/rocket_sms/transceiver.rb', line 125

def connect
  return unless @active
  if @settings and @settings[:connection]
    log "Connecting transceiver #{@id}."
    @connection = EM.connect(
      @settings[:connection][:host], 
      @settings[:connection][:port], 
      Smpp::Transceiver, 
      @settings[:connection], 
      self
    )
  else
    EM::Timer.new(1){ connect }
  end
end

#delivery_report_received(transceiver, pdu) ⇒ Object



206
207
208
209
210
# File 'lib/rocket_sms/transceiver.rb', line 206

def delivery_report_received(transceiver, pdu)
  log "#{@id} - DR Received"
  ticket = { pdu: { source_addr: pdu.source_addr, short_message: pdu.short_message, destination_addr: pdu.destination_addr } }
  EM.next_tick { redis.lpush(queues[:dr],MultiJson.dump(ticket)) }
end

#dispatchObject



141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
# File 'lib/rocket_sms/transceiver.rb', line 141

def dispatch
  return unless @active
  interval = @fast ? ((throughput.to_f)**-1)*1.1 : 0.5
  @dispatcher = EM::Timer.new(interval){ dispatch }
  redis.multi
  redis.zrange("gateway:transceivers:#{@id}:dispatch", 0, 0)
  redis.zremrangebyrank("gateway:transceivers:#{@id}:dispatch", 0, 0)
  redis.exec do |response|
    if response
      payload = response[0][0]
      if payload
        @fast = true
        now = Time.now.to_f
        message = Message.from_json(payload)
        if message.send_at > now
          score = (message.send_at * 1000).to_i
          redis.zadd("gateway:transceivers:#{@id}:dispatch", score , payload)
        elsif message.send_at <= now and now < message.expires_at
          log "Message #{message.id} detected on #{@id}. Sending."
          send_message(message)
        elsif message.expires_at <= now
          log "Message #{message.id} detected on #{@id} but has expired. Retrying."
          message.add_pass
          score = (( Time.now.to_f + 15 ) * 1000).to_i
          redis.zadd(queues[:mt][:pending], score, message.to_json)
          @dispatcher.cancel if @dispatcher
          EM.next_tick{ dispatch }
        end
      else
        @fast = false
      end
    end
  end
end

#dredisObject



18
19
20
# File 'lib/rocket_sms/transceiver.rb', line 18

def dredis
  @dredis ||= EM::Hiredis.connect(@redis_url)
end

#log(msg, level = 'info') ⇒ Object



26
27
28
29
30
31
32
# File 'lib/rocket_sms/transceiver.rb', line 26

def log(msg, level = 'info')
  if EM.reactor_running?
    EM.defer{ logger.send(level, msg) }
  else
    logger.send(level, msg)
  end
end

#loggerObject



22
23
24
# File 'lib/rocket_sms/transceiver.rb', line 22

def logger
  @logger ||= Logger.new(@log_location)
end

#message_accepted(transceiver, mt_message_id, pdu) ⇒ Object



212
213
214
215
216
217
218
219
220
221
# File 'lib/rocket_sms/transceiver.rb', line 212

def message_accepted(transceiver, mt_message_id, pdu)
  message = @mts.delete(mt_message_id.to_s)
  if message
    log "#{@id} - Message #{message.id} - Accepted"
    message.accepted_at = Time.now.to_i
    EM.next_tick { redis.lpush(queues[:mt][:success],message.to_json) }
  else
    log "#{@id} - Untracked MT Accepted #{mt_message_id}"
  end
end

#message_rejected(transceiver, mt_message_id, pdu) ⇒ Object



223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
# File 'lib/rocket_sms/transceiver.rb', line 223

def message_rejected(transceiver, mt_message_id, pdu)
  message = @mts.delete(mt_message_id.to_s)
  if message
    log "#{@id} - Message #{message.id} - Rejected"
    message.add_pass
    message.rejected_at = Time.now.to_i
    if message.pass <= 5
      score = Time.now.to_i + 10
      EM.next_tick{ redis.zadd(queues[:mt][:pending], score, message.to_json) }
    else
      EM.next_tick { redis.lpush(queues[:mt][:failure],message.to_json) }
    end
  else
    log "#{@id} - Untracked MT Rejected #{mt_message_id}"
  end
end

#mo_received(transceiver, pdu) ⇒ Object



200
201
202
203
204
# File 'lib/rocket_sms/transceiver.rb', line 200

def mo_received(transceiver, pdu)
  log "#{@id} - Message Received"
  ticket = { pdu: { source_addr: pdu.source_addr, short_message: pdu.short_message, destination_addr: pdu.destination_addr } }
  EM.next_tick { redis.lpush(queues[:mo],MultiJson.dump(ticket)) }
end

#queuesObject



34
35
36
# File 'lib/rocket_sms/transceiver.rb', line 34

def queues
  RocketSMS.queues
end

#redisObject



14
15
16
# File 'lib/rocket_sms/transceiver.rb', line 14

def redis
  @redis ||= EM::Hiredis.connect(@redis_url)
end

#registerObject



176
177
178
179
# File 'lib/rocket_sms/transceiver.rb', line 176

def register
  stat = @online ? 'online' : 'offline'
  redis.hset("gateway:transceivers:#{@id}", 'status', stat)
end

#send_message(message) ⇒ Object



181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
# File 'lib/rocket_sms/transceiver.rb', line 181

def send_message(message)
  begin
    if @online
      log "Sending Message #{message.id} through DID #{message.sender} via #{@id}."
      @mts[message.id.to_s] = message
      @connection.send_mt(message.id,message.sender,message.receiver,message.body)
    else
      log "#{@id} is not connected. Pushing message #{message.id} to dispatch queue."
      score = (message.send_at * 1000).to_i
      redis.zadd("gateway:transceivers:#{@id}:dispatch", score , payload)
    end
  rescue Exception
    log "### Error Sending MT #{message.id} with DID #{message.sender} through Transceiver #{@id}. Retrying message."
    message.add_pass
    score = Time.now.to_i + 15
    redis.zadd(queues[:mt][:pending], score, message.to_json)
  end
end

#shutdownObject



78
79
80
81
# File 'lib/rocket_sms/transceiver.rb', line 78

def shutdown
  log "Transceiver #{@id} DOWN."
  EM.stop
end

#startObject



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/rocket_sms/transceiver.rb', line 42

def start
  EM.threadpool_size = 128
  EM.set_max_timers(100_000)
  EM.run do
    log "Starting Transceiver #{@id}"
    # Set quantum to 10 milliseconds to support throughputs up to 100 MTs/sec
    EM.set_quantum(10)
    # Detect transceiver configuration from Redis.
    configure
    # Connect
    connect
    # Start detecting and dispatching MTs
    dispatch
    # Trap exit-related signals
    Signal.trap("INT") { |signal| stop(signal) }
    Signal.trap("TERM") { |signal| stop(signal) }
  end
end

#stop(signal = nil) ⇒ Object



61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/rocket_sms/transceiver.rb', line 61

def stop(signal = nil)
  if @kill
    log "#{@id} - Forcing Exit. Check your data for losses."
    shutdown
  else
    log "#{@id} - Stopping. Waiting for pending operations to finish."
    @kill = true
    @active = false
    @connection.close_connection_after_writing if @connection
    @dispatcher.cancel if @dispatcher
    @configurator.cancel if @configurator
    @reconnector.cancel if @reconnector
    redis.del("gateways:transceivers:#{@id}")
    cleanup
  end
end

#throughputObject



38
39
40
# File 'lib/rocket_sms/transceiver.rb', line 38

def throughput
  @settings && @settings[:throughput] ||= 1.0
end

#unbound(transceiver) ⇒ Object



247
248
249
250
251
252
253
254
255
256
# File 'lib/rocket_sms/transceiver.rb', line 247

def unbound(transceiver)  
  log "#{@id} - Transceiver Unbound"
  if @active
    log "#{@id} is not connected. Retrying in 10 seconds."
    @reconnector.cancel if @reconnector
    @reconnector = EM::Timer.new(10){ connect }
  end
  @online = false
  register
end