Class: Fluent::RedisStoreOutput
- Inherits:
-
BufferedOutput
- Object
- BufferedOutput
- Fluent::RedisStoreOutput
- Defined in:
- lib/fluent/plugin/out_redis_store.rb
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #format(tag, time, record) ⇒ Object
- #generate_ltrim_script(key, maxlen, order) ⇒ Object
- #generate_zremrangebyrank_script(key, maxlen, order) ⇒ Object
- #get_key_from(record) ⇒ Object
- #get_score_from(record, time) ⇒ Object
- #get_value_from(record) ⇒ Object
-
#initialize ⇒ RedisStoreOutput
constructor
A new instance of RedisStoreOutput.
- #operation_for_list(record) ⇒ Object
- #operation_for_publish(record) ⇒ Object
- #operation_for_set(record) ⇒ Object
- #operation_for_string(record) ⇒ Object
- #operation_for_zset(record, time) ⇒ Object
- #set_key_expire(key) ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
- #traverse(data, key) ⇒ Object
- #write(chunk) ⇒ Object
Constructor Details
#initialize ⇒ RedisStoreOutput
Returns a new instance of RedisStoreOutput.
27 28 29 30 31 32 |
# File 'lib/fluent/plugin/out_redis_store.rb', line 27 def initialize super require 'oj' require 'redis' require 'msgpack' end |
Instance Method Details
#configure(conf) ⇒ Object
34 35 36 37 38 39 40 |
# File 'lib/fluent/plugin/out_redis_store.rb', line 34 def configure(conf) super if @key_path == nil and @key == nil raise Fluent::ConfigError, "either key_path or key is required" end end |
#format(tag, time, record) ⇒ Object
57 58 59 |
# File 'lib/fluent/plugin/out_redis_store.rb', line 57 def format(tag, time, record) [tag, time, record].to_msgpack end |
#generate_ltrim_script(key, maxlen, order) ⇒ Object
163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 |
# File 'lib/fluent/plugin/out_redis_store.rb', line 163 def generate_ltrim_script(key, maxlen, order) script = "local key = '" + key.to_s + "'\n" script += "local maxlen = " + maxlen.to_s + "\n" script += "local order ='" + order.to_s + "'\n" script += "local len = tonumber(redis.call('LLEN', key))\n" script += "if len > maxlen then\n" script += " if order == 'asc' then\n" script += " local l = len - maxlen\n" script += " return redis.call('LTRIM', key, l, -1)\n" script += " else\n" script += " return redis.call('LTRIM', key, 0, maxlen - 1)\n" script += " end\n" script += "end\n" return script end |
#generate_zremrangebyrank_script(key, maxlen, order) ⇒ Object
145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 |
# File 'lib/fluent/plugin/out_redis_store.rb', line 145 def generate_zremrangebyrank_script(key, maxlen, order) script = "local key = '" + key.to_s + "'\n" script += "local maxlen = " + maxlen.to_s + "\n" script += "local order ='" + order.to_s + "'\n" script += "local len = tonumber(redis.call('ZCOUNT', key, '-inf', '+inf'))\n" script += "if len > maxlen then\n" script += " if order == 'asc' then\n" script += " local l = len - maxlen\n" script += " if l >= 0 then\n" script += " return redis.call('ZREMRANGEBYRANK', key, 0, l)\n" script += " end\n" script += " else\n" script += " return redis.call('ZREMRANGEBYRANK', key, maxlen, -1)\n" script += " end\n" script += "end\n" return script end |
#get_key_from(record) ⇒ Object
191 192 193 194 195 196 197 198 199 200 201 |
# File 'lib/fluent/plugin/out_redis_store.rb', line 191 def get_key_from(record) if @key k = @key else k = traverse(record, @key_path).to_s end key = @key_prefix + k + @key_suffix raise Fluent::ConfigError, "key is empty" if key == '' key end |
#get_score_from(record, time) ⇒ Object
216 217 218 219 220 221 222 |
# File 'lib/fluent/plugin/out_redis_store.rb', line 216 def get_score_from(record, time) if @score_path traverse(record, @score_path) else time end end |
#get_value_from(record) ⇒ Object
203 204 205 206 207 208 209 210 211 212 213 214 |
# File 'lib/fluent/plugin/out_redis_store.rb', line 203 def get_value_from(record) value = traverse(record, @value_path) case @format_type when 'json' #value.to_json value = Oj.dump(value) when 'msgpack' value.to_msgpack else value end end |
#operation_for_list(record) ⇒ Object
115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib/fluent/plugin/out_redis_store.rb', line 115 def operation_for_list(record) key = get_key_from(record) value = get_value_from(record) if @order == 'asc' @redis.rpush key, value else @redis.lpush key, value end set_key_expire key if 0 < @value_length script = generate_ltrim_script(key, @value_length, @order) @redis.eval script end end |
#operation_for_publish(record) ⇒ Object
139 140 141 142 143 |
# File 'lib/fluent/plugin/out_redis_store.rb', line 139 def operation_for_publish(record) key = get_key_from(record) value = get_value_from(record) @redis.publish key, value end |
#operation_for_set(record) ⇒ Object
108 109 110 111 112 113 |
# File 'lib/fluent/plugin/out_redis_store.rb', line 108 def operation_for_set(record) key = get_key_from(record) value = get_value_from(record) @redis.sadd key, value set_key_expire key end |
#operation_for_string(record) ⇒ Object
131 132 133 134 135 136 137 |
# File 'lib/fluent/plugin/out_redis_store.rb', line 131 def operation_for_string(record) key = get_key_from(record) value = get_value_from(record) @redis.set key, value set_key_expire key end |
#operation_for_zset(record, time) ⇒ Object
91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 |
# File 'lib/fluent/plugin/out_redis_store.rb', line 91 def operation_for_zset(record, time) key = get_key_from(record) value = get_value_from(record) score = get_score_from(record, time) @redis.zadd key, score, value set_key_expire key if 0 < @value_expire now = Time.now.to_i @redis.zremrangebyscore key , '-inf' , (now - @value_expire) end if 0 < @value_length script = generate_zremrangebyrank_script(key, @value_length, @order) @redis.eval script end end |
#set_key_expire(key) ⇒ Object
224 225 226 227 228 |
# File 'lib/fluent/plugin/out_redis_store.rb', line 224 def set_key_expire(key) if 0 < @key_expire @redis.expire key, @key_expire end end |
#shutdown ⇒ Object
53 54 55 |
# File 'lib/fluent/plugin/out_redis_store.rb', line 53 def shutdown @redis.quit end |
#start ⇒ Object
42 43 44 45 46 47 48 49 50 51 |
# File 'lib/fluent/plugin/out_redis_store.rb', line 42 def start super if @path @redis = Redis.new(:path => @path, :password => @password, :timeout => @timeout, :thread_safe => true, :db => @db) else @redis = Redis.new(:host => @host, :port => @port, :password => @password, :timeout => @timeout, :thread_safe => true, :db => @db) end end |
#traverse(data, key) ⇒ Object
179 180 181 182 183 184 185 186 187 188 189 |
# File 'lib/fluent/plugin/out_redis_store.rb', line 179 def traverse(data, key) val = data key.split('.').each{ |k| if val.has_key?(k) val = val[k] else return nil end } return val end |
#write(chunk) ⇒ Object
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/fluent/plugin/out_redis_store.rb', line 61 def write(chunk) @redis.pipelined { chunk.open { |io| begin MessagePack::Unpacker.new(io).each { || begin (tag, time, record) = case @store_type when 'zset' operation_for_zset(record, time) when 'set' operation_for_set(record) when 'list' operation_for_list(record) when 'string' operation_for_string(record) when 'publish' operation_for_publish(record) end rescue NoMethodError => e puts e end } rescue EOFError # EOFError always occured when reached end of chunk. end } } end |