Class: Fluent::Plugin::UnixInput

Inherits:
Input
  • Object
show all
Defined in:
lib/fluent/plugin/in_unix.rb

Overview

TODO: This plugin will be 3rd party plugin

Defined Under Namespace

Classes: Handler

Constant Summary

Constants included from Configurable

Configurable::CONFIG_TYPE_REGISTRY

Instance Attribute Summary

Attributes included from Fluent::PluginLoggerMixin

#log

Attributes inherited from Base

#under_plugin_development

Instance Method Summary collapse

Methods inherited from Input

#emit_records, #emit_size, #metric_callback, #multi_workers_ready?, #statistics

Methods included from Fluent::PluginHelper::Mixin

included

Methods included from Fluent::PluginLoggerMixin

included, #terminate

Methods included from Fluent::PluginId

#plugin_id, #plugin_id_configured?, #plugin_id_for_test?, #plugin_root_dir, #stop

Methods inherited from Base

#acquire_worker_lock, #after_shutdown, #after_shutdown?, #after_start, #after_started?, #before_shutdown, #before_shutdown?, #called_in_test?, #close, #closed?, #configured?, #context_router, #context_router=, #fluentd_worker_id, #get_lock_path, #has_router?, #inspect, #multi_workers_ready?, #plugin_root_dir, #reloadable_plugin?, #shutdown?, #started?, #stop, #stopped?, #string_safe_encoding, #terminate, #terminated?

Methods included from SystemConfig::Mixin

#system_config, #system_config_override

Methods included from Configurable

#config, #configure_proxy_generate, #configured_section_create, included, lookup_type, register_type

Constructor Details

#initializeUnixInput

Returns a new instance of UnixInput.



33
34
35
36
37
# File 'lib/fluent/plugin/in_unix.rb', line 33

def initialize
  super

  @lsock = nil
end

Instance Method Details

#configure(conf) ⇒ Object



46
47
48
# File 'lib/fluent/plugin/in_unix.rb', line 46

def configure(conf)
  super
end

#convert_time(time) ⇒ Object



135
136
137
138
139
140
141
142
143
144
# File 'lib/fluent/plugin/in_unix.rb', line 135

def convert_time(time)
  case time
  when nil, 0
    Fluent::EventTime.now
  when Fluent::EventTime
    time
  else
    Fluent::EventTime.from_time(Time.at(time))
  end
end

#listenObject



66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/fluent/plugin/in_unix.rb', line 66

def listen
  if File.exist?(@path)
    log.warn "Found existing '#{@path}'. Remove this file for in_unix plugin"
    File.unlink(@path)
  end
  FileUtils.mkdir_p(File.dirname(@path))

  log.info "listening fluent socket on #{@path}"
  s = Coolio::UNIXServer.new(@path, Handler, log, method(:on_message))
  s.listen(@backlog) unless @backlog.nil?
  s
end

#on_message(msg) ⇒ Object

message Entry

1: long time
2: object record

message Forward

1: string tag
2: list<Entry> entries

message PackedForward

1: string tag
2: raw entries  # msgpack stream of Entry

message Message

1: string tag
2: long? time
3: object record



99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/fluent/plugin/in_unix.rb', line 99

def on_message(msg)
  unless msg.is_a?(Array)
    log.warn "incoming data is broken:", msg: msg
    return
  end

  tag = @tag || (msg[0].to_s)
  entries = msg[1]

  case entries
  when String
    # PackedForward
    es = Fluent::MessagePackEventStream.new(entries)
    router.emit_stream(tag, es)

  when Array
    # Forward
    es = Fluent::MultiEventStream.new
    entries.each {|e|
      record = e[1]
      next if record.nil?
      time = convert_time(e[0])
      es.add(time, record)
    }
    router.emit_stream(tag, es)

  else
    # Message
    record = msg[2]
    return if record.nil?

    time = convert_time(msg[1])
    router.emit(tag, time, record)
  end
end

#shutdownObject



57
58
59
60
61
62
63
64
# File 'lib/fluent/plugin/in_unix.rb', line 57

def shutdown
  if @lsock
    event_loop_detach(@lsock)
    @lsock.close
  end

  super
end

#startObject



50
51
52
53
54
55
# File 'lib/fluent/plugin/in_unix.rb', line 50

def start
  super

  @lsock = listen
  event_loop_attach(@lsock)
end