Class: WebsocketRails::Synchronization

Inherits:
Object
  • Object
show all
Includes:
Logging
Defined in:
lib/websocket_rails/synchronization.rb

Constant Summary

Constants included from Logging

Logging::ANSI, Logging::LOGGABLE_DATA

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Logging

#color_for_level, #colorize, configure, #log, #log_data?, #log_event, #log_event?, #log_event_end, #log_event_start, #log_exception, #log_header, log_level, #wrap

Class Method Details

.all_usersObject



8
9
10
# File 'lib/websocket_rails/synchronization.rb', line 8

def self.all_users
  singleton.all_users
end

.destroy_user(connection) ⇒ Object



20
21
22
# File 'lib/websocket_rails/synchronization.rb', line 20

def self.destroy_user(connection)
  singleton.destroy_user connection
end

.find_user(connection) ⇒ Object



12
13
14
# File 'lib/websocket_rails/synchronization.rb', line 12

def self.find_user(connection)
  singleton.find_user connection
end

.publish(event) ⇒ Object



24
25
26
# File 'lib/websocket_rails/synchronization.rb', line 24

def self.publish(event)
  singleton.publish event
end

.redisObject



36
37
38
# File 'lib/websocket_rails/synchronization.rb', line 36

def self.redis
  singleton.redis
end

.register_user(connection) ⇒ Object



16
17
18
# File 'lib/websocket_rails/synchronization.rb', line 16

def self.register_user(connection)
  singleton.register_user connection
end

.shutdown!Object



32
33
34
# File 'lib/websocket_rails/synchronization.rb', line 32

def self.shutdown!
  singleton.shutdown!
end

.singletonObject



40
41
42
# File 'lib/websocket_rails/synchronization.rb', line 40

def self.singleton
  @singleton ||= new
end

.synchronize!Object



28
29
30
# File 'lib/websocket_rails/synchronization.rb', line 28

def self.synchronize!
  singleton.synchronize!
end

Instance Method Details

#all_usersObject



175
176
177
178
179
# File 'lib/websocket_rails/synchronization.rb', line 175

def all_users
  Fiber.new do
    redis.hgetall('websocket_rails.users')
  end.resume
end

#destroy_user(identifier) ⇒ Object



162
163
164
165
166
# File 'lib/websocket_rails/synchronization.rb', line 162

def destroy_user(identifier)
  Fiber.new do
    redis.hdel 'websocket_rails.users', identifier
  end.resume
end

#find_user(identifier) ⇒ Object



168
169
170
171
172
173
# File 'lib/websocket_rails/synchronization.rb', line 168

def find_user(identifier)
  Fiber.new do
    raw_user = redis.hget('websocket_rails.users', identifier)
    raw_user ? JSON.parse(raw_user) : nil
  end.resume
end

#generate_server_tokenObject



133
134
135
136
137
138
139
# File 'lib/websocket_rails/synchronization.rb', line 133

def generate_server_token
  begin
    token = SecureRandom.urlsafe_base64
  end while redis.sismember("websocket_rails.active_servers", token)

  token
end

#publish(event) ⇒ Object



63
64
65
66
67
68
# File 'lib/websocket_rails/synchronization.rb', line 63

def publish(event)
  Fiber.new do
    event.server_token = server_token
    redis.publish "websocket_rails.events", event.serialize
  end.resume
end

#redisObject



46
47
48
49
50
51
52
53
# File 'lib/websocket_rails/synchronization.rb', line 46

def redis
  @redis ||= begin
    redis_options = WebsocketRails.config.redis_options
    debug "Reactor is not running - engaging ruby redis" unless EM.reactor_running?
    debug "Reactor is running - engaging standard redis new" if EM.reactor_running?
    EM.reactor_running? ? Redis.new(redis_options) : ruby_redis
  end
end

#register_server(token) ⇒ Object



141
142
143
144
145
146
# File 'lib/websocket_rails/synchronization.rb', line 141

def register_server(token)
  Fiber.new do
    redis.sadd "websocket_rails.active_servers", token
    info "Server Registered: #{token}"
  end.resume
end

#register_user(connection) ⇒ Object



154
155
156
157
158
159
160
# File 'lib/websocket_rails/synchronization.rb', line 154

def register_user(connection)
  Fiber.new do
    id = connection.user_identifier
    user = connection.user
    redis.hset 'websocket_rails.users', id, user.as_json(root: false).to_json
  end.resume
end

#remove_server(token) ⇒ Object



148
149
150
151
152
# File 'lib/websocket_rails/synchronization.rb', line 148

def remove_server(token)
  ruby_redis.srem "websocket_rails.active_servers", token
  info "Server Removed: #{token}"
  EM.stop
end

#ruby_redisObject



55
56
57
58
59
60
61
# File 'lib/websocket_rails/synchronization.rb', line 55

def ruby_redis
  @ruby_redis ||= begin
    WebsocketRails.config.redis_options.merge(:driver => :ruby) unless WebsocketRails.config.redis_options.has_key? :driver
    redis_options = WebsocketRails.config.redis_options
    Redis.new(redis_options)
  end
end

#server_tokenObject



70
71
72
# File 'lib/websocket_rails/synchronization.rb', line 70

def server_token
  @server_token
end

#shutdown!Object



129
130
131
# File 'lib/websocket_rails/synchronization.rb', line 129

def shutdown!
  remove_server(server_token)
end

#synchronize!Object



74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/websocket_rails/synchronization.rb', line 74

def synchronize!
  unless @synchronizing
    @server_token = generate_server_token
    register_server(@server_token)

    synchro = Fiber.new do
      EM.defer do
        fiber_redis = Redis.new(WebsocketRails.config.redis_options)
        fiber_redis.subscribe "websocket_rails.events" do |on|
          debug "Subscribed to websocket_rails events"

          on.message do |_, encoded_event|
            event = Event.new_from_json(encoded_event, nil)

            # Do nothing if this is the server that sent this event.
            next if event.server_token == server_token

            # Ensure an event never gets triggered twice. Events added to the
            # redis queue from other processes may not have a server token
            # attached.
            event.server_token = server_token if event.server_token.nil?

            trigger_incoming event
          end
        end
      end
      info "Beginning Synchronization"
    end
    @synchronizing = true

    EM.next_tick { synchro.resume }

    trap('TERM') do
      Thread.new { shutdown! }
    end
    trap('INT') do
      Thread.new { shutdown! }
    end
    trap('QUIT') do
      Thread.new { shutdown! }
    end
  end
end

#trigger_incoming(event) ⇒ Object



118
119
120
121
122
123
124
125
126
127
# File 'lib/websocket_rails/synchronization.rb', line 118

def trigger_incoming(event)
  case
  when event.is_channel?
    WebsocketRails[event.channel].trigger_event(event)
  when event.is_user?
    connection = WebsocketRails.users[event.user_id.to_s]
    return if connection.nil?
    connection.trigger event
  end
end