Class: Fluent::OnlineuserOutput
- Inherits:
-
BufferedOutput
- Object
- BufferedOutput
- Fluent::OnlineuserOutput
- Defined in:
- lib/fluent/plugin/out_onlineuser.rb
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #extract_uid(record) ⇒ Object
- #format(tag, time, record) ⇒ Object
-
#initialize ⇒ OnlineuserOutput
constructor
A new instance of OnlineuserOutput.
- #shutdown ⇒ Object
- #start ⇒ Object
- #start_watch ⇒ Object
- #to_redis_key(segment_str) ⇒ Object
- #tryOnRedis(method, *args) ⇒ Object
- #user_count(segment) ⇒ Object
- #watch ⇒ Object
- #write(chunk) ⇒ Object
Constructor Details
#initialize ⇒ OnlineuserOutput
Returns a new instance of OnlineuserOutput.
21 22 23 24 25 26 |
# File 'lib/fluent/plugin/out_onlineuser.rb', line 21 def initialize super require 'redis' require 'msgpack' require 'rubygems' end |
Instance Method Details
#configure(conf) ⇒ Object
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/fluent/plugin/out_onlineuser.rb', line 28 def configure(conf) super @host = conf.has_key?('host') ? conf['host'] : 'localhost' @port = conf.has_key?('port') ? conf['port'].to_i : 6379 @db_number = conf.has_key?('db_number') ? conf['db_number'].to_i : nil @user_identify = @user_identify.split '|' if @segment s = @segment.split /\s+/ case s.first when 'tag' @cal_segment = lambda { |tag, record| 'tag:' + tag } when 'capture' if s.length == 3 format = s[2] if format[0] == ?/ and format[-1] == ?/ format = format[1..-2] end regexp = Regexp.new(format) @cal_segment = lambda { |tag, record| if record[s[1]] and record[s[1]].is_a? String if c = record[s[1]].match(regexp) 'captured_' + s[1] + ':' + (c.captures.join '_') end end } else raise Fluent::ConfigError, "wrong segment capture, specify it like 'capture __FIELD_NAME__ __CAPTURE_REGEX__'" end else @cal_segment = lambda { |tag, record| if record[@segment] @segment + ':' + record[@segment] end } end end end |
#extract_uid(record) ⇒ Object
144 145 146 147 148 149 150 |
# File 'lib/fluent/plugin/out_onlineuser.rb', line 144 def extract_uid(record) i = @user_identify.detect { |id| v = record[id] (v.is_a?(String) and !v.empty?) or (v.is_a?(Numeric) and v != 0) } i ? record[i] : nil end |
#format(tag, time, record) ⇒ Object
152 153 154 |
# File 'lib/fluent/plugin/out_onlineuser.rb', line 152 def format(tag, time, record) [tag, time, record].to_msgpack end |
#shutdown ⇒ Object
89 90 91 |
# File 'lib/fluent/plugin/out_onlineuser.rb', line 89 def shutdown @redis.quit end |
#start ⇒ Object
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
# File 'lib/fluent/plugin/out_onlineuser.rb', line 68 def start super begin gem "hiredis" @redis = Redis.new( :host => @host, :port => @port, :driver => :hiredis, :thread_safe => true, :db => @db_index ) rescue LoadError @redis = Redis.new( :host => @host, :port => @port, :thread_safe => true, :db => @db_index ) end @segments = {} start_watch end |
#start_watch ⇒ Object
110 111 112 |
# File 'lib/fluent/plugin/out_onlineuser.rb', line 110 def start_watch @watcher = Thread.new(&method(:watch)) end |
#to_redis_key(segment_str) ⇒ Object
93 94 95 |
# File 'lib/fluent/plugin/out_onlineuser.rb', line 93 def to_redis_key(segment_str) @redis_key_prefix + segment_str end |
#tryOnRedis(method, *args) ⇒ Object
97 98 99 100 101 102 103 104 105 106 107 108 |
# File 'lib/fluent/plugin/out_onlineuser.rb', line 97 def tryOnRedis(method, *args) tries = 0 begin @redis.send(method, *args) if @redis.respond_to? method rescue Redis::CommandError => e tries += 1 # retry 3 times retry if tries <= @redis_retry $log.warn %Q[redis command retry failed : #{method}(#{args.join(', ')})] raise e. end end |
#user_count(segment) ⇒ Object
137 138 139 140 141 142 |
# File 'lib/fluent/plugin/out_onlineuser.rb', line 137 def user_count(segment) to_expire = Fluent::Engine.now - @session_timeout key = to_redis_key segment tryOnRedis 'zremrangebyscore', key, '-inf', to_expire tryOnRedis 'zcard', key end |
#watch ⇒ Object
114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 |
# File 'lib/fluent/plugin/out_onlineuser.rb', line 114 def watch @last_checked = Fluent::Engine.now tick = 60 while true sleep 0.5 if Fluent::Engine.now - @last_checked >= tick now = Fluent::Engine.now @segments.each_key do |segment| user_num = user_count segment if user_num == 0 @segments.delete segment elsif not @silent Fluent::Engine.emit @tag + '.' + segment, Fluent::Engine.now, {"online_user" => user_num} end end @last_checked = now end end end |
#write(chunk) ⇒ Object
156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 |
# File 'lib/fluent/plugin/out_onlineuser.rb', line 156 def write(chunk) online_user = Hash.new { |hash, key| hash[key] = Hash.new } begin chunk.msgpack_each do |(tag, time, record)| if (uid = extract_uid record) online_user['all'][uid] = time if @cal_segment and @cal_segment != '' and segment = @cal_segment.call(tag, record) online_user[segment][uid] = time end end end ## write to redis online_user.each { |segment, users| @segments[segment] = true key = to_redis_key segment users.each { |uid, ts| tryOnRedis 'zadd', key, ts, uid } } end end |