Class: LogStash::Codecs::Protobuf
- Inherits:
-
Base
- Object
- Base
- LogStash::Codecs::Protobuf
- Defined in:
- lib/logstash/codecs/protobuf.rb
Overview
This codec converts protobuf encoded messages into logstash events and vice versa.
Requires the protobuf definitions as ruby files. You can create those using the [ruby-protoc compiler](github.com/codekitchen/ruby-protocol-buffers).
The following shows a usage example for decoding protobuf 2 encoded events from a kafka stream:
- source,ruby
-
kafka {
zk_connect => "127.0.0.1" topic_id => "your_topic_goes_here" key_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer" value_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer" codec => protobuf { class_name => "Animal::Unicorn" include_path => ['/path/to/protobuf/definitions/UnicornProtobuf.pb.rb'] }
}
Same example for protobuf 3:
- source,ruby
-
kafka {
zk_connect => "127.0.0.1" topic_id => "your_topic_goes_here" key_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer" value_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer" codec => protobuf { class_name => "Animal.Unicorn" include_path => ['/path/to/protobuf/definitions/UnicornProtobuf_pb.rb'] protobuf_version => 3 }
}
Specifically for the kafka input: please set the deserializer classes as shown above.
Instance Attribute Summary collapse
-
#execution_context ⇒ Object
readonly
Returns the value of attribute execution_context.
Instance Method Summary collapse
- #decode(data) ⇒ Object
- #encode(event) ⇒ Object
-
#pipeline_id ⇒ Object
id of the pipeline whose events you want to read from.
- #register ⇒ Object
-
#reloadable? ⇒ Boolean
Pipelines using this plugin cannot be reloaded.
Instance Attribute Details
#execution_context ⇒ Object (readonly)
Returns the value of attribute execution_context.
145 146 147 |
# File 'lib/logstash/codecs/protobuf.rb', line 145 def execution_context @execution_context end |
Instance Method Details
#decode(data) ⇒ Object
203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 |
# File 'lib/logstash/codecs/protobuf.rb', line 203 def decode(data) if @protobuf_version == 3 decoded = @pb_builder.decode(data.to_s) h = pb3_deep_to_hash(decoded) else decoded = @pb_builder.parse(data.to_s) h = decoded.to_hash end yield LogStash::Event.new(h) if block_given? rescue => e @logger.warn("Couldn't decode protobuf: #{e.inspect}.") if stop_on_error raise e else # keep original message so that the user can debug it. yield LogStash::Event.new("message" => data, "tags" => ["_protobufdecodefailure"]) end end |
#encode(event) ⇒ Object
222 223 224 225 226 227 228 229 |
# File 'lib/logstash/codecs/protobuf.rb', line 222 def encode(event) if @protobuf_version == 3 protobytes = pb3_encode(event) else protobytes = pb2_encode(event) end @on_event.call(event, protobytes) end |
#pipeline_id ⇒ Object
id of the pipeline whose events you want to read from.
148 149 150 |
# File 'lib/logstash/codecs/protobuf.rb', line 148 def pipeline_id respond_to?(:execution_context) && !execution_context.nil? ? execution_context.pipeline_id : "main" end |
#register ⇒ Object
152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 |
# File 'lib/logstash/codecs/protobuf.rb', line 152 def register @metainfo_messageclasses = {} @metainfo_enumclasses = {} @metainfo_pb2_enumlist = [] @pb3_typeconversion_tag = "_protobuf_type_converted" if @include_path.length > 0 and not class_file.strip.empty? raise LogStash::ConfigurationError, "Cannot use `include_path` and `class_file` at the same time" end if @include_path.length == 0 and class_file.strip.empty? raise LogStash::ConfigurationError, "Need to specify `include_path` or `class_file`" end should_register = Google::Protobuf::DescriptorPool.generated_pool.lookup(class_name).nil? unless @protobuf_root_directory.nil? or @protobuf_root_directory.strip.empty? if !$LOAD_PATH.include? @protobuf_root_directory and should_register $LOAD_PATH.unshift(@protobuf_root_directory) end end @class_file = "#{@protobuf_root_directory}/#{@class_file}" unless (Pathname.new @class_file).absolute? or @class_file.empty? # exclusive access while loading protobuf definitions Google::Protobuf::DescriptorPool.with_lock.synchronize do # load from `class_file` load_protobuf_definition(@class_file) if should_register and !@class_file.empty? # load from `include_path` include_path.each { |path| load_protobuf_definition(path) } if include_path.length > 0 and should_register if @protobuf_version == 3 @pb_builder = Google::Protobuf::DescriptorPool.generated_pool.lookup(class_name).msgclass else @pb_builder = pb2_create_instance(class_name) end end end |
#reloadable? ⇒ Boolean
Pipelines using this plugin cannot be reloaded. github.com/elastic/logstash/pull/6499
The DescriptorPool instance registers the protobuf classes (and dependencies) as global objects. This makes it very difficult to reload a pipeline, because ‘class_name` and all of its dependencies are already registered.
199 200 201 |
# File 'lib/logstash/codecs/protobuf.rb', line 199 def reloadable? return false end |