Class: LogStash::Inputs::Beats::MessageListener
- Inherits:
-
Object
- Object
- LogStash::Inputs::Beats::MessageListener
- Defined in:
- lib/logstash/inputs/beats/message_listener.rb
Defined Under Namespace
Classes: ConnectionState
Constant Summary collapse
- FILEBEAT_LOG_LINE_FIELD =
"message".freeze
- LSF_LOG_LINE_FIELD =
"line".freeze
Instance Attribute Summary collapse
-
#connections_list ⇒ Object
readonly
Returns the value of attribute connections_list.
-
#event_factory ⇒ Object
readonly
Returns the value of attribute event_factory.
-
#input ⇒ Object
readonly
Returns the value of attribute input.
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
Instance Method Summary collapse
-
#initialize(queue, input) ⇒ MessageListener
constructor
A new instance of MessageListener.
- #onChannelInitializeException(ctx, cause) ⇒ Object
- #onConnectionClose(ctx) ⇒ Object
- #onException(ctx, cause) ⇒ Object
- #onNewConnection(ctx) ⇒ Object
- #onNewMessage(ctx, message) ⇒ Object
-
#set_nested(hash, field_name, value) ⇒ Object
only to make it testable.
Constructor Details
#initialize(queue, input) ⇒ MessageListener
Returns a new instance of MessageListener.
20 21 22 23 24 25 26 27 28 29 30 31 |
# File 'lib/logstash/inputs/beats/message_listener.rb', line 20 def initialize(queue, input) @connections_list = ThreadSafe::Hash.new @queue = queue @logger = input.logger @input = input @metric = @input.metric @peak_connection_count = Concurrent::AtomicFixnum.new(0) @nocodec_transformer = RawEventTransform.new(@input) @codec_transformer = DecodedEventTransform.new(@input) @event_factory = input.event_factory end |
Instance Attribute Details
#connections_list ⇒ Object (readonly)
Returns the value of attribute connections_list.
16 17 18 |
# File 'lib/logstash/inputs/beats/message_listener.rb', line 16 def connections_list @connections_list end |
#event_factory ⇒ Object (readonly)
Returns the value of attribute event_factory.
18 19 20 |
# File 'lib/logstash/inputs/beats/message_listener.rb', line 18 def event_factory @event_factory end |
#input ⇒ Object (readonly)
Returns the value of attribute input.
16 17 18 |
# File 'lib/logstash/inputs/beats/message_listener.rb', line 16 def input @input end |
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
16 17 18 |
# File 'lib/logstash/inputs/beats/message_listener.rb', line 16 def logger @logger end |
Instance Method Details
#onChannelInitializeException(ctx, cause) ⇒ Object
70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/logstash/inputs/beats/message_listener.rb', line 70 def onChannelInitializeException(ctx, cause) # This is mostly due to a bad certificate or keys, running Logstash in debug mode will show more information if cause.is_a?(Java::JavaLang::IllegalArgumentException) if input.logger.debug? input.logger.error("Looks like you either have a bad certificate, an invalid key or your private key was not in PKCS8 format.", :exception => cause) else input.logger.error("Looks like you either have a bad certificate, an invalid key or your private key was not in PKCS8 format.") end else input.logger.warn("Error when creating a connection", :exception => cause.to_s) end end |
#onConnectionClose(ctx) ⇒ Object
65 66 67 68 |
# File 'lib/logstash/inputs/beats/message_listener.rb', line 65 def onConnectionClose(ctx) unregister_connection(ctx) decrement_connection_count() end |
#onException(ctx, cause) ⇒ Object
83 84 85 |
# File 'lib/logstash/inputs/beats/message_listener.rb', line 83 def onException(ctx, cause) unregister_connection(ctx) unless connections_list[ctx].nil? end |
#onNewConnection(ctx) ⇒ Object
60 61 62 63 |
# File 'lib/logstash/inputs/beats/message_listener.rb', line 60 def onNewConnection(ctx) register_connection(ctx) increment_connection_count() end |
#onNewMessage(ctx, message) ⇒ Object
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/logstash/inputs/beats/message_listener.rb', line 33 def onNewMessage(ctx, ) hash = .getData if @input. ip_address = ip_address(ctx) unless ip_address.nil? || hash['@metadata'].nil? set_nested(hash, @input.field_hostip, ip_address) end end target_field = extract_target_field(hash) extract_tls_peer(hash, ctx) if target_field.nil? event = event_factory.new_event(hash) @nocodec_transformer.transform(event) @queue << event else codec(ctx).accept(CodecCallbackListener.new(target_field, hash, .getIdentityStream(), @codec_transformer, @queue)) end end |
#set_nested(hash, field_name, value) ⇒ Object
only to make it testable
166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 |
# File 'lib/logstash/inputs/beats/message_listener.rb', line 166 def set_nested(hash, field_name, value) field_ref = Java::OrgLogstash::FieldReference.from(field_name) # create @metadata sub-hash if needed if field_ref.type == Java::OrgLogstash::FieldReference::META_CHILD nesting_hash = hash["@metadata"] else nesting_hash = hash end field_ref.path.each do |token| nesting_hash[token] = {} unless nesting_hash.key?(token) nesting_hash = nesting_hash[token] end nesting_hash[field_ref.key] = value end |