Class: Fluent::OnlineuserOutput

Inherits:
BufferedOutput
  • Object
show all
Defined in:
lib/fluent/plugin/out_onlineuser.rb

Instance Method Summary collapse

Constructor Details

#initializeOnlineuserOutput

Returns a new instance of OnlineuserOutput.



21
22
23
24
25
26
# File 'lib/fluent/plugin/out_onlineuser.rb', line 21

def initialize
  super
  require 'redis'
  require 'msgpack'
  require 'rubygems'
end

Instance Method Details

#configure(conf) ⇒ Object



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/fluent/plugin/out_onlineuser.rb', line 28

def configure(conf)
  super
  @host = conf.has_key?('host') ? conf['host'] : 'localhost'
  @port = conf.has_key?('port') ? conf['port'].to_i : 6379
  @db_number = conf.has_key?('db_number') ? conf['db_number'].to_i : nil

  @user_identify = @user_identify.split '|'

  if @segment
    s = @segment.split /\s+/
    case s.first
      when 'tag'
        @cal_segment = lambda { |tag, record| 'tag:' + tag }
      when 'capture'
        if s.length == 3
          format = s[2]
          if format[0] == ?/ and format[-1] == ?/
            format = format[1..-2]
          end
          regexp = Regexp.new(format)
          @cal_segment = lambda { |tag, record|
            if record[s[1]] and record[s[1]].is_a? String
              if c = record[s[1]].match(regexp)
                'captured_' + s[1] + ':' + (c.captures.join '_')
              end
            end
          }
        else
          raise Fluent::ConfigError, "wrong segment capture, specify it like 'capture __FIELD_NAME__ __CAPTURE_REGEX__'"
        end
      else
        @cal_segment = lambda { |tag, record|
          if record[@segment]
            @segment + ':' + record[@segment]
          end
        }
    end
  end
end

#extract_uid(record) ⇒ Object



144
145
146
147
148
149
150
# File 'lib/fluent/plugin/out_onlineuser.rb', line 144

def extract_uid(record)
  i = @user_identify.detect { |id|
    v = record[id]
    (v.is_a?(String) and !v.empty?) or (v.is_a?(Numeric) and v != 0)
  }
  i ? record[i] : nil
end

#format(tag, time, record) ⇒ Object



152
153
154
# File 'lib/fluent/plugin/out_onlineuser.rb', line 152

def format(tag, time, record)
  [tag, time, record].to_msgpack
end

#shutdownObject



89
90
91
# File 'lib/fluent/plugin/out_onlineuser.rb', line 89

def shutdown
  @redis.quit
end

#startObject



68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/fluent/plugin/out_onlineuser.rb', line 68

def start
  super

  begin
    gem "hiredis"
    @redis = Redis.new(
        :host => @host, :port => @port, :driver => :hiredis,
        :thread_safe => true, :db => @db_index
    )
  rescue LoadError
    @redis = Redis.new(
        :host => @host, :port => @port,
        :thread_safe => true, :db => @db_index
    )
  end

  @segments = {}

  start_watch
end

#start_watchObject



110
111
112
# File 'lib/fluent/plugin/out_onlineuser.rb', line 110

def start_watch
  @watcher = Thread.new(&method(:watch))
end

#to_redis_key(segment_str) ⇒ Object



93
94
95
# File 'lib/fluent/plugin/out_onlineuser.rb', line 93

def to_redis_key(segment_str)
  @redis_key_prefix + segment_str
end

#tryOnRedis(method, *args) ⇒ Object



97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/fluent/plugin/out_onlineuser.rb', line 97

def tryOnRedis(method, *args)
  tries = 0
  begin
    @redis.send(method, *args) if @redis.respond_to? method
  rescue Redis::CommandError => e
    tries += 1
    # retry 3 times
    retry if tries <= @redis_retry
    $log.warn %Q[redis command retry failed : #{method}(#{args.join(', ')})]
    raise e.message
  end
end

#user_count(segment) ⇒ Object



137
138
139
140
141
142
# File 'lib/fluent/plugin/out_onlineuser.rb', line 137

def user_count(segment)
  to_expire = Fluent::Engine.now - @session_timeout
  key = to_redis_key segment
  tryOnRedis 'zremrangebyscore', key, '-inf', to_expire
  tryOnRedis 'zcard', key
end

#watchObject



114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/fluent/plugin/out_onlineuser.rb', line 114

def watch
  @last_checked = Fluent::Engine.now
  tick = 60
  while true
    sleep 0.5
    if Fluent::Engine.now - @last_checked >= tick
      now = Fluent::Engine.now

      @segments.each_key do |segment|
        user_num = user_count segment

        if user_num == 0
          @segments.delete segment
        elsif not @silent
          Fluent::Engine.emit @tag + '.' + segment, Fluent::Engine.now, {"online_user" => user_num}
        end
      end

      @last_checked = now
    end
  end
end

#write(chunk) ⇒ Object



156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
# File 'lib/fluent/plugin/out_onlineuser.rb', line 156

def write(chunk)

  online_user = Hash.new { |hash, key| hash[key] = Hash.new }

  begin
    chunk.msgpack_each do |(tag, time, record)|

      if (uid = extract_uid record)
        online_user['all'][uid] = time

        if @cal_segment and @cal_segment != '' and segment = @cal_segment.call(tag, record)
          online_user[segment][uid] = time
        end
      end
    end

    ## write to redis
    online_user.each { |segment, users|
      @segments[segment] = true

      key = to_redis_key segment

      users.each { |uid, ts|
        tryOnRedis 'zadd', key, ts, uid
      }
    }

  end
end