Class: FayeOnline
- Inherits:
-
Object
- Object
- FayeOnline
- Defined in:
- lib/faye-online/message.rb,
lib/faye-online.rb
Overview
管理用户在线文档 一个用户在单个房间单个多个页面,就有了多个clientId。
-
用户在线状态规则
在线: 超过一个clientId 离线: 零个clientId
-
清理过期clientId,以防止存储在redis Hashes里的数组无法删除个别元素。因为怀疑为过期clientId之后就没有连接,所以也不会进行任何操作。
采用策略是放到一个clientId和开始时间对应的redis Hashes里,在用户‘/meta/disconnect’时检测各个clientId有效性
Defined Under Namespace
Classes: Message
Constant Summary collapse
- ValidChannel =
只支持数字字母和斜杠
proc {|channel| !!channel.to_s.match(/\A[0-9a-z\/]+\Z/i) }
- MONITORED_CHANNELS =
‘/meta/subscribe’, ‘/connect’, ‘/close’ are ignored
['/meta/connect', '/meta/disconnect']
- LOCAL_IP =
Socket.ip_address_list.detect{|intf| intf.ipv4_private?}.ip_address
- LOCAL_FAYE_URI =
URI.parse("http://#{LOCAL_IP}:#{ENV['FAYE_PORT'] || 9292}/faye")
Class Method Summary collapse
- .channel_clientIds_array ⇒ Object
- .disconnect(clientId) ⇒ Object
- .get_server(redis_opts, valid_message_proc = nil) ⇒ Object
- .log(params, user_name_proc = proc {|uid| uid }) ⇒ Object
- .status ⇒ Object
- .uniq_clientIds ⇒ Object
Instance Method Summary collapse
- #incoming(message, callback) ⇒ Object
-
#initialize(redis_opts, valid_message_proc = nil) ⇒ FayeOnline
constructor
A new instance of FayeOnline.
Constructor Details
#initialize(redis_opts, valid_message_proc = nil) ⇒ FayeOnline
Returns a new instance of FayeOnline.
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/faye-online.rb', line 37 def initialize redis_opts, = nil raise "Please run `$faye_server = FayeOnline.get_server` first, cause we have to bind disconnect event." if not $faye_server.is_a?(Faye::RackAdapter) FayeOnline.redis_opts = redis_opts FayeOnline. = || (proc {|| true }) FayeOnline.redis = Redis.new(FayeOnline.redis_opts) FayeOnline.redis.select FayeOnline.redis_opts[:database] FayeOnline.faye_client ||= Faye::Client.new(LOCAL_FAYE_URI.to_s) # 配置ActiveRecord if Rails.root.nil? Dir[File.('../../app/models/*.rb', __FILE__)].each {|f| require f } end return self end |
Class Method Details
.channel_clientIds_array ⇒ Object
60 61 62 63 64 65 66 67 |
# File 'lib/faye-online.rb', line 60 def self.channel_clientIds_array array = [] FayeOnline.redis.keys("/#{FayeOnline.redis_opts[:namespace]}/uid_to_clientIds*").sort.each do |k| _data = FayeOnline.redis.hgetall(k).values.map {|i| JSON.parse(i) rescue i }.flatten array << [k, _data] end array end |
.disconnect(clientId) ⇒ Object
72 73 74 75 76 77 |
# File 'lib/faye-online.rb', line 72 def self.disconnect clientId = {'channel' => '/meta/disconnect', 'clientId' => clientId} # fake a client to disconnect, 仅仅接受message.auth为空,即网络失去连接的情况 FayeOnline::Message.new(.merge('fake' => 'true')).process if not ['auth'] end |
.get_server(redis_opts, valid_message_proc = nil) ⇒ Object
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/faye-online/server.rb', line 3 def FayeOnline.get_server redis_opts, = nil $faye_server = Faye::RackAdapter.new( :mount => '/faye', # the maximum time to hold a connection open before returning the response. This is given in seconds and must be smaller than the timeout on your frontend webserver(thin). Faye uses Thin as its webserver, whose default timeout is 30 seconds. # https://groups.google.com/forum/?fromgroups#!topic/faye-users/DvFrPGOinKw :timeout => 10, # [https://groups.google.com/forum/#!searchin/faye-users/disconnect/faye-users/2bn8xUHF5-E/A4a3Sk7RgW4J] It's expected. The browser will not always be able to deliver an explicit disconnect message, which is why there is server-side idle client detection. # garbag collect disconnected clientIds in EventMachine.add_periodic_timer :engine => {:gc => 60}.merge(redis_opts.merge(:type => Faye::Redis)), :ping => 30 # (optional) how often, in seconds, to send keep-alive ping messages over WebSocket and EventSource connections. Use this if your Faye server will be accessed through a proxy that kills idle connections. ) $faye_server.bind(:handshake) do |clientId| end $faye_server.bind(:subscribe) do |clientId, channel| end $faye_server.bind(:unsubscribe) do |clientId, channel| end $faye_server.bind(:publish) do |clientId, channel, data| end $faye_server.bind(:disconnect) do |clientId| FayeOnline.disconnect clientId # dynamic compute interval seconds tmp = FayeOnline.channel_clientIds_array.reject {|i| i[1].blank? } # delete below, cause map data is valid if tmp.any? puts "开始有 #{FayeOnline.uniq_clientIds.count}个" tmp.map(&:last).flatten.uniq.shuffle[0..19].each do |_clientId| if not FayeOnline.engine_proxy.has_connection? _clientId puts "开始处理无效 #{_clientId}" # 1. 先伪装去disconnect clientId # 2. 如果失败,就直接操作redis修改 if not FayeOnline.disconnect(_clientId) # 没删除成功,因为之前没有设置auth k = (tmp.detect {|a, b| b.index(_clientId) } || [])[0] # 直接从redis清除无效_clientId FayeOnline.redis.hgetall(k).each do |k2, v2| v3 = JSON.parse(v2) rescue [] v3.delete _clientId FayeOnline.redis.hset(k, k2, v3.to_json) end if k end end end puts "结束有 #{FayeOnline.uniq_clientIds.count}个" end end $faye_server.add_extension FayeOnline.new(redis_opts, ) FayeOnline.engine_proxy = $faye_server.instance_variable_get("@server").engine return $faye_server end |
.log(params, user_name_proc = proc {|uid| uid }) ⇒ Object
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/faye-online/status.rb', line 43 def FayeOnline.log params, user_name_proc = proc {|uid| uid } scope = FayeUserLoginLog.order("t ASC") # 个人整理后列出 if params[:user] scope = scope.where(:uid => user_name_proc.call(params[:user])) channel_to_logs = scope.inject({}) {|h, log| i = FayeChannel[log.channel_id]; h[i] ||= []; h[i] << log; h } array = ["用户 #{params[:user]}[#{user_name_proc.call(params[:user])}] 的登陆日志详情"] channel_to_logs.each do |channel, logs| array << '' array << channel logs2 = logs.inject({}) {|h, log| h[log.clientId] ||= []; h[log.clientId] << log; h } # 合并交叉的时间 ctc = CrossTimeCalculation.new logs2.each do |clientId, _logs| # logs = logs.sort {|a, b| (a && a.t) <=> (b && b.t) } if _logs.size > 0 # binding.pry if _logs[1].nil? te = _logs[1] ? _logs[1].t : nil ctc.add(_logs[0].t, te) _time_passed = CrossTimeCalculation.new.add(_logs[0].t, te).total_seconds.to_i end array << [clientId, _logs.map {|_log| "#{_log.status}: #{_log.t.strftime("%Y-%m-%d %H:%M:%S")}" }, "#{_time_passed || '未知'}秒"].flatten.compact.inspect end array << "共用时 #{ctc.total_seconds.to_i}秒" end array # 群体直接列出日志 else scope.limit(500).map do |log| [%w[离开 连上][log.status], log.uid, user_name_proc.call(log.uid), log.t.strftime("%Y-%m-%d %H:%M:%S"), FayeChannel[log.channel_id], log.clientId].inspect end end end |
.status ⇒ Object
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/faye-online/status.rb', line 3 def FayeOnline.status array = [] clientIds = Set.new channel_clientIds_array = FayeOnline.channel_clientIds_array clientId_to_users = channel_clientIds_array.map(&:last).flatten.uniq.inject({}) do |h, _clientId| _data = JSON.parse(Redis.current.get("/#{FayeOnline.redis_opts[:namespace]}/clientId_auth/#{_clientId}")) rescue {} h[_clientId] = (_data['current_user'] || {}).except('uhash').values h end channel_clientIds_array.each do |_channel, _clientIds| _a = _clientIds.map {|i| [i, clientId_to_users[i]] } _c = _a.map {|i| i[1][0] }.uniq.count array << "#{_channel}: #{_c}个用户: #{_a}" _clientIds.each {|i| clientIds.add i } end array.unshift "" users = clientIds.map {|i| clientId_to_users[i] }.uniq {|i| i[0] } array.unshift "/classes/[0-9]+ 用于班级讨论的消息通讯, /courses/[0-9]+/lessons/[0-9]+ 用于课时的计时" array.unshift "" array.unshift "#{users.count}个用户分别是: #{users}" array.unshift "" array.unshift "实时在线clientIds总共有#{clientIds.count}个: #{clientIds.to_a}" array.unshift "" array.unshift "一个clientId表示用户打开了一个页面。一个用户在同一课时可能打开多个页面,那就是一个user,多个clientId" # 删除意外没有退出的在线用户列表 uids = users.map(&:first) FayeChannelOnlineList.all.reject {|i| i.data.blank? }.each do |online_list| online_list.user_list.each do |uid| online_list.delete uid if not uids.include? uid end end array end |
.uniq_clientIds ⇒ Object
68 69 70 |
# File 'lib/faye-online.rb', line 68 def self.uniq_clientIds self.channel_clientIds_array.map(&:last).flatten.uniq end |