Class: Fluent::WebSocketOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_websocket.rb

Instance Method Summary collapse

Instance Method Details

#buffer(data) ⇒ Object



89
90
91
92
93
94
# File 'lib/fluent/plugin/out_websocket.rb', line 89

def buffer(data)
  return unless @buffered_messages > 0
  @buffer << data
  # Buffer only new @buffered_messages messages
  @buffer = @buffer[-@buffered_messages, @buffered_messages] if @buffer.length > @buffered_messages
end

#configure(conf) ⇒ Object



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/fluent/plugin/out_websocket.rb', line 32

def configure(conf)
  super
  @thread = Thread.new do
    $log.trace "Started em-websocket thread."
    $log.info "WebSocket server #{@host}:#{@port} [msgpack: #{@use_msgpack}]"
    EM.run {
      EM::WebSocket.run(:host => @host, :port => @port) do |ws|
        ws.onopen { |handshake|
          callback = @use_msgpack ? proc{|msg| ws.send_binary(msg)} : proc{|msg| ws.send(msg)}
          $lock.synchronize do
            sid = $channel.subscribe callback
            $log.trace "WebSocket connection: ID " + sid.to_s
            ws.onclose {
              $log.trace "Connection closed: ID " + sid.to_s
              $lock.synchronize do
                $channel.unsubscribe(sid)
              end
            }
            @buffer.each do |msg|
              ws.send(msg)
            end
          end

          #ws.onmessage { |msg|
          #}
        }
      end
    }
  end
end

#emit(tag, es, chain) ⇒ Object



75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/fluent/plugin/out_websocket.rb', line 75

def emit(tag, es, chain)
  chain.next
  es.each {|time,record|
    data = [record]
    if (@add_time) then data.unshift(time) end
    if (@add_tag) then data.unshift(tag) end
    output = @use_msgpack ? data.to_msgpack : Yajl::Encoder.encode( data )
    buffer(output)
    $lock.synchronize do
      $channel.push output
    end
  }
end

#shutdownObject



68
69
70
71
72
73
# File 'lib/fluent/plugin/out_websocket.rb', line 68

def shutdown
  super
  EM.stop
  Thread::kill(@thread)
  $log.trace "Killed em-websocket thread."
end

#startObject



63
64
65
66
# File 'lib/fluent/plugin/out_websocket.rb', line 63

def start
  @buffer = []
  super
end