Class: Fluent::ZMQInput

Inherits:
Input
  • Object
show all
Defined in:
lib/fluent/plugin/in_zmq.rb

Instance Method Summary collapse

Constructor Details

#initializeZMQInput

config_param :server_type, :string, :default => ‘nonblocking’



33
34
35
36
# File 'lib/fluent/plugin/in_zmq.rb', line 33

def initialize
  require 'zmq'
  super
end

Instance Method Details

#configure(conf) ⇒ Object



38
39
40
# File 'lib/fluent/plugin/in_zmq.rb', line 38

def configure(conf)
  super
end

#runObject



62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/fluent/plugin/in_zmq.rb', line 62

def run
  begin
    while true
      ret = ZMQ::select([@server])
      ret[0].each do |sock|
        msg = sock.recv
        on_message(MessagePack.unpack(msg))
      end
    end
  rescue
    $log.error "unexpected error", :error=>$!.to_s
    $log.error_backtrace
  end
end

#shutdownObject



57
58
59
60
# File 'lib/fluent/plugin/in_zmq.rb', line 57

def shutdown
  @server.close
  @thread.join
end

#startObject



42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/fluent/plugin/in_zmq.rb', line 42

def start
  Signal.trap(:INT){
    $log.error "SIGINT occured. shutdown..."
    @server.close
    exit(0)
  }

  $log.debug "listening http on #{@bind}:#{@port}"
  @zmq = ZMQ::Context.new
  @server = @zmq.socket(ZMQ::UPSTREAM)
  @server.bind("tcp://" + @bind + ":" + @port.to_s)
  @thread = Thread.new(&method(:run))
  @cached_unpacker = MessagePack::Unpacker.new
end