Class: Fluent::ScribeInput

Inherits:
Input
  • Object
show all
Defined in:
lib/fluent/plugin/in_scribe.rb

Defined Under Namespace

Classes: FluentScribeHandler

Constant Summary collapse

SUPPORTED_FORMAT =
{
  'text' => :text,
  'json' => :json,
  'url_param' => :url_param,
}

Instance Method Summary collapse

Constructor Details

#initializeScribeInput

Returns a new instance of ScribeInput.



46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/fluent/plugin/in_scribe.rb', line 46

def initialize
  require 'cgi'
  require 'yajl'
  require 'thrift'
  $:.unshift File.join(File.dirname(__FILE__), 'thrift')
  require 'fb303_types'
  require 'fb303_constants'
  require 'facebook_service'
  require 'scribe_types'
  require 'scribe_constants'
  require 'scribe'
  super
end

Instance Method Details

#configure(conf) ⇒ Object



60
61
62
# File 'lib/fluent/plugin/in_scribe.rb', line 60

def configure(conf)
  super
end

#runObject



115
116
117
118
119
120
# File 'lib/fluent/plugin/in_scribe.rb', line 115

def run
  @server.serve
rescue => e
  log.error "unexpected error", :error => e.inspect
  log.error_backtrace
end

#shutdownObject



110
111
112
113
# File 'lib/fluent/plugin/in_scribe.rb', line 110

def shutdown
  @transport.close unless @transport.closed?
  #@thread.join # TODO
end

#startObject



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/fluent/plugin/in_scribe.rb', line 64

def start
  log.debug "listening scribe on #{@bind}:#{@port}"

  handler = FluentScribeHandler.new
  handler.add_prefix = @add_prefix
  handler.remove_newline = @remove_newline
  handler.msg_format = @msg_format
  handler.ignore_invalid_record = @ignore_invalid_record
  handler.logger = log
  processor = Scribe::Processor.new handler

  @transport = Thrift::ServerSocket.new @bind, @port
  if @is_framed
    transport_factory = Thrift::FramedTransportFactory.new
  else
    transport_factory = Thrift::BufferedTransportFactory.new
  end

  # 2011/09/29 Kazuki Ohta <[email protected]>
  # This section is a workaround to set strict_read and strict_write option.
  # Ruby-Thrift 0.7 set them both 'true' in default, but Scribe protocol set
  # them both 'false'.
  protocol_factory = Thrift::BinaryProtocolFactory.new
  protocol_factory.instance_eval {|obj|
    def get_protocol(trans) # override
      return Thrift::BinaryProtocol.new(trans,
        strict_read=false,
        strict_write=false)
    end
  }

  case @server_type
  when 'simple'
    @server = Thrift::SimpleServer.new processor, @transport, transport_factory, protocol_factory
  when 'threaded'
    @server = Thrift::ThreadedServer.new processor, @transport, transport_factory, protocol_factory
  when 'thread_pool'
    @server = Thrift::ThreadPoolServer.new processor, @transport, transport_factory, protocol_factory
  when 'nonblocking'
    @server = Thrift::NonblockingServer.new processor, @transport, transport_factory, protocol_factory
  else
    raise ConfigError, "in_scribe: unsupported server_type '#{@server_type}'"
  end
  @thread = Thread.new(&method(:run))
end