Class: Fluent::ResqueExOutput

Inherits:
BufferedOutput
  • Object
show all
Includes:
SetTagKeyMixin, SetTimeKeyMixin
Defined in:
lib/fluent/plugin/out_resque_ex.rb

Instance Method Summary collapse

Constructor Details

#initializeResqueExOutput

Returns a new instance of ResqueExOutput.



15
16
17
18
19
20
21
22
# File 'lib/fluent/plugin/out_resque_ex.rb', line 15

def initialize
  super
  require 'multi_json'
  require 'redis'
  require 'redis-namespace'
  require 'uuidtools'
  require 'digest/md5'
end

Instance Method Details

#configure(conf) ⇒ Object



24
25
26
27
28
29
# File 'lib/fluent/plugin/out_resque_ex.rb', line 24

def configure(conf)
  super

  @worker_class_name_tag = conf['worker_class_name_tag'] || 'class'
  self.redis = conf['redis'] if conf['redis']
end

#enqueue(queue, klass, args) ⇒ Object



59
60
61
62
63
# File 'lib/fluent/plugin/out_resque_ex.rb', line 59

def enqueue(queue, klass, args)
  id = Digest::MD5.hexdigest(UUIDTools::UUID.random_create.to_s)
  redis.sadd(:queues, queue.to_s)
  redis.rpush("queue:#{queue}", ::MultiJson.encode(:class => klass, :id => id, :args => [args]))
end

#format(tag, time, record) ⇒ Object



73
74
75
# File 'lib/fluent/plugin/out_resque_ex.rb', line 73

def format(tag, time, record)
  [tag, time, record].to_msgpack
end

#redisObject



53
54
55
56
57
# File 'lib/fluent/plugin/out_resque_ex.rb', line 53

def redis
  return @redis if @redis && !@redis.kind_of?(String)
  self.redis = Redis.respond_to?(:connect) ? Redis.connect : "localhost:6379"
  self.redis
end

#redis=(server) ⇒ Object

code from resque.rb



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/fluent/plugin/out_resque_ex.rb', line 32

def redis=(server)
  case server
  when String
    if server =~ /redis\:\/\//
      redis = Redis.connect(:url => server, :thread_safe => true)
    else
      server, namespace = server.split('/', 2)
      host, port, db = server.split(':')
      redis = Redis.new(:host => host, :port => port,
                        :thread_safe => true, :db => db)
    end
    namespace ||= :resque

    @redis = Redis::Namespace.new(namespace, :redis => redis)
  when Redis::Namespace
    @redis = server
  else
    @redis = Redis::Namespace.new(:resque, :redis => server)
  end
end

#shutdownObject



69
70
71
# File 'lib/fluent/plugin/out_resque_ex.rb', line 69

def shutdown
  super
end

#startObject



65
66
67
# File 'lib/fluent/plugin/out_resque_ex.rb', line 65

def start
  super
end

#write(chunk) ⇒ Object



77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/fluent/plugin/out_resque_ex.rb', line 77

def write(chunk)
  queue_name = @queue_mapped ? chunk.key : @queue

  chunk.msgpack_each {|tag, time, record|
    klass = record.delete(@worker_class_name_tag)
    if klass && !klass.empty?
      enqueue(queue_name, klass, record)
    else
      $log.error("record have not #{@worker_class_name_tag} key.")
    end
  }
end