Class: LogStash::Plugins::Builtin::Pipeline::Input
- Inherits:
-
Inputs::Base
- Object
- Inputs::Base
- LogStash::Plugins::Builtin::Pipeline::Input
- Defined in:
- lib/logstash/plugins/builtin/pipeline/input.rb
Instance Attribute Summary collapse
-
#pipeline_bus ⇒ Object
readonly
Returns the value of attribute pipeline_bus.
Instance Method Summary collapse
-
#internalReceive(events) ⇒ Object
Returns false if the receive failed due to a stopping input To understand why this value is useful see Internal.send_to Note, this takes a java Stream, not a ruby array.
- #isRunning ⇒ Object
- #register ⇒ Object
- #run(queue) ⇒ Object
- #running? ⇒ Boolean
- #stop ⇒ Object
Instance Attribute Details
#pipeline_bus ⇒ Object (readonly)
Returns the value of attribute pipeline_bus.
8 9 10 |
# File 'lib/logstash/plugins/builtin/pipeline/input.rb', line 8 def pipeline_bus @pipeline_bus end |
Instance Method Details
#internalReceive(events) ⇒ Object
Returns false if the receive failed due to a stopping input To understand why this value is useful see Internal.send_to Note, this takes a java Stream, not a ruby array
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/logstash/plugins/builtin/pipeline/input.rb', line 37 def internalReceive(events) return false if !@running.get() # TODO This should probably push a batch at some point in the future when doing so # buys us some efficiency events.forEach do |event| decorate(event) @queue << event end return true rescue => e require 'pry'; binding.pry return true end |
#isRunning ⇒ Object
59 60 61 |
# File 'lib/logstash/plugins/builtin/pipeline/input.rb', line 59 def isRunning @running.get end |
#register ⇒ Object
10 11 12 13 14 15 16 17 18 19 |
# File 'lib/logstash/plugins/builtin/pipeline/input.rb', line 10 def register # May as well set this up here, writers won't do anything until # @running is set to false @running = java.util.concurrent.atomic.AtomicBoolean.new(false) @pipeline_bus = execution_context.agent.pipeline_bus listen_successful = pipeline_bus.listen(self, address) if !listen_successful raise ::LogStash::ConfigurationError, "Internal input at '#{@address}' already bound! Addresses must be globally unique across pipelines." end end |
#run(queue) ⇒ Object
21 22 23 24 25 26 27 28 |
# File 'lib/logstash/plugins/builtin/pipeline/input.rb', line 21 def run(queue) @queue = queue @running.set(true) while @running.get() sleep 0.1 end end |
#running? ⇒ Boolean
30 31 32 |
# File 'lib/logstash/plugins/builtin/pipeline/input.rb', line 30 def running? @running && @running.get() end |
#stop ⇒ Object
53 54 55 56 57 |
# File 'lib/logstash/plugins/builtin/pipeline/input.rb', line 53 def stop # We stop receiving events before we unlisten to prevent races @running.set(false) if @running # If register wasn't yet called, no @running! pipeline_bus.unlisten(self, address) end |