Class: LogStash::Outputs::Faye

Inherits:
Base
  • Object
show all
Includes:
PluginMixins::HttpClient
Defined in:
lib/logstash/outputs/faye.rb

Instance Method Summary collapse

Instance Method Details

#closeObject



118
119
120
# File 'lib/logstash/outputs/faye.rb', line 118

def close
  client.close
end

#multi_receive(events) ⇒ Object

def register



56
57
58
59
# File 'lib/logstash/outputs/faye.rb', line 56

def multi_receive(events)
  events.each {|event| receive(event, :parallel)}
  client.execute!
end

#receive(event, async_type = :background) ⇒ Object

Once we no longer need to support Logstash < 2.2 (pre-ng-pipeline) We don’t need to handle :background style requests

We use :background style requests for Logstash < 2.2 because before the microbatching pipeline performance is greatly improved by having some degree of async behavior.

In Logstash 2.2 and after things are much simpler, we just run each batch in parallel This will make performance much easier to reason about, and more importantly let us guarantee that if ‘multi_receive` returns all items have been sent.



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/logstash/outputs/faye.rb', line 70

def receive(event, async_type=:background)

  if @faye_token
    body = LogStash::Json.dump({:channel => @channel, :data => map_event(event), :ext => {:auth_token => @faye_token}})
  else
    body = LogStash::Json.dump({:channel => @channel, :data => map_event(event)})
  end

  # Block waiting for a token
  token = @request_tokens.pop if async_type == :background

  # Send the request
  url = event.sprintf(@url)
  headers = event_headers(event)

  # Create an async request
  request = client.send(async_type).send(@http_method, url, :body => body, :headers => headers)

  request.on_complete do
    # Make sure we return the token to the pool
    @request_tokens << token  if async_type == :background
  end

  request.on_success do |response|
    if response.code < 200 || response.code > 299
      log_failure(
        "Encountered non-200 HTTP code #{response.code}",
        :response_code => response.code,
        :url => url,
        :event => event.to_hash)
    end
  end

  request.on_failure do |exception|
    log_failure("Could not fetch URL",
                :url => url,
                :method => @http_method,
                :body => faye_body.to_json,
                :headers => headers,
                :message => exception.message,
                :class => exception.class.name,
                :backtrace => exception.backtrace
    )
  end

  request.call if async_type == :background
end

#registerObject



42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/logstash/outputs/faye.rb', line 42

def register
  @http_method = :post
  @content_type = "application/json"

  # We count outstanding requests with this queue
  # This queue tracks the requests to create backpressure
  # When this queue is empty no new requests may be sent,
  # tokens must be added back by the client on success
  @request_tokens = SizedQueue.new(@pool_max)
  @pool_max.times {|t| @request_tokens << true }

  @requests = Array.new
end