Class: Fluent::ResqueExOutput
- Inherits:
-
BufferedOutput
- Object
- BufferedOutput
- Fluent::ResqueExOutput
- Includes:
- SetTagKeyMixin, SetTimeKeyMixin
- Defined in:
- lib/fluent/plugin/out_resque_ex.rb
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #enqueue(queue, klass, args) ⇒ Object
- #format(tag, time, record) ⇒ Object
-
#initialize ⇒ ResqueExOutput
constructor
A new instance of ResqueExOutput.
- #redis ⇒ Object
-
#redis=(server) ⇒ Object
code from resque.rb.
- #shutdown ⇒ Object
- #start ⇒ Object
- #write(chunk) ⇒ Object
Constructor Details
#initialize ⇒ ResqueExOutput
Returns a new instance of ResqueExOutput.
15 16 17 18 19 20 21 22 |
# File 'lib/fluent/plugin/out_resque_ex.rb', line 15 def initialize super require 'multi_json' require 'redis' require 'redis-namespace' require 'uuidtools' require 'digest/md5' end |
Instance Method Details
#configure(conf) ⇒ Object
24 25 26 27 28 29 |
# File 'lib/fluent/plugin/out_resque_ex.rb', line 24 def configure(conf) super @worker_class_name_tag = conf['worker_class_name_tag'] || 'class' self.redis = conf['redis'] if conf['redis'] end |
#enqueue(queue, klass, args) ⇒ Object
59 60 61 62 63 |
# File 'lib/fluent/plugin/out_resque_ex.rb', line 59 def enqueue(queue, klass, args) id = Digest::MD5.hexdigest(UUIDTools::UUID.random_create.to_s) redis.sadd(:queues, queue.to_s) redis.rpush("queue:#{queue}", ::MultiJson.encode(:class => klass, :id => id, :args => [args])) end |
#format(tag, time, record) ⇒ Object
73 74 75 |
# File 'lib/fluent/plugin/out_resque_ex.rb', line 73 def format(tag, time, record) [tag, time, record].to_msgpack end |
#redis ⇒ Object
53 54 55 56 57 |
# File 'lib/fluent/plugin/out_resque_ex.rb', line 53 def redis return @redis if @redis && !@redis.kind_of?(String) self.redis = Redis.respond_to?(:connect) ? Redis.connect : "localhost:6379" self.redis end |
#redis=(server) ⇒ Object
code from resque.rb
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/fluent/plugin/out_resque_ex.rb', line 32 def redis=(server) case server when String if server =~ /redis\:\/\// redis = Redis.connect(:url => server, :thread_safe => true) else server, namespace = server.split('/', 2) host, port, db = server.split(':') redis = Redis.new(:host => host, :port => port, :thread_safe => true, :db => db) end namespace ||= :resque @redis = Redis::Namespace.new(namespace, :redis => redis) when Redis::Namespace @redis = server else @redis = Redis::Namespace.new(:resque, :redis => server) end end |
#shutdown ⇒ Object
69 70 71 |
# File 'lib/fluent/plugin/out_resque_ex.rb', line 69 def shutdown super end |
#start ⇒ Object
65 66 67 |
# File 'lib/fluent/plugin/out_resque_ex.rb', line 65 def start super end |
#write(chunk) ⇒ Object
77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/fluent/plugin/out_resque_ex.rb', line 77 def write(chunk) queue_name = @queue_mapped ? chunk.key : @queue chunk.msgpack_each {|tag, time, record| klass = record.delete(@worker_class_name_tag) if klass && !klass.empty? enqueue(queue_name, klass, record) else $log.error("record have not #{@worker_class_name_tag} key.") end } end |