Class: LogStash::Codecs::Protobuf

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/codecs/protobuf.rb

Overview

This codec converts protobuf encoded messages into logstash events and vice versa.

Requires the protobuf definitions as ruby files. You can create those using the [ruby-protoc compiler](github.com/codekitchen/ruby-protocol-buffers).

The following shows a usage example for decoding protobuf 2 encoded events from a kafka stream:

source,ruby

kafka {

zk_connect => "127.0.0.1"
topic_id => "your_topic_goes_here"
key_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"
value_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"
codec => protobuf
{
  class_name => "Animal::Unicorn"
  include_path => ['/path/to/protobuf/definitions/UnicornProtobuf.pb.rb']
}

}

Same example for protobuf 3:

source,ruby

kafka {

zk_connect => "127.0.0.1"
topic_id => "your_topic_goes_here"
key_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"
value_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"
codec => protobuf
{
  class_name => "Animal.Unicorn"
  include_path => ['/path/to/protobuf/definitions/UnicornProtobuf_pb.rb']
  protobuf_version => 3
}

}

Specifically for the kafka input: please set the deserializer classes as shown above.

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#execution_contextObject (readonly)

Returns the value of attribute execution_context.



145
146
147
# File 'lib/logstash/codecs/protobuf.rb', line 145

def execution_context
  @execution_context
end

Instance Method Details

#decode(data) ⇒ Object



203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
# File 'lib/logstash/codecs/protobuf.rb', line 203

def decode(data)
  if @protobuf_version == 3
    decoded = @pb_builder.decode(data.to_s)
    h = pb3_deep_to_hash(decoded)
  else
    decoded = @pb_builder.parse(data.to_s)
    h = decoded.to_hash
  end
  yield LogStash::Event.new(h) if block_given?
rescue => e
  @logger.warn("Couldn't decode protobuf: #{e.inspect}.")
  if stop_on_error
    raise e
  else # keep original message so that the user can debug it.
    yield LogStash::Event.new("message" => data, "tags" => ["_protobufdecodefailure"])
  end
end

#encode(event) ⇒ Object



222
223
224
225
226
227
228
229
# File 'lib/logstash/codecs/protobuf.rb', line 222

def encode(event)
  if @protobuf_version == 3
    protobytes = pb3_encode(event)
  else
    protobytes = pb2_encode(event)
  end
  @on_event.call(event, protobytes)
end

#pipeline_idObject

id of the pipeline whose events you want to read from.



148
149
150
# File 'lib/logstash/codecs/protobuf.rb', line 148

def pipeline_id
  respond_to?(:execution_context) && !execution_context.nil? ? execution_context.pipeline_id : "main"
end

#registerObject



152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
# File 'lib/logstash/codecs/protobuf.rb', line 152

def register
  @metainfo_messageclasses = {}
  @metainfo_enumclasses = {}
  @metainfo_pb2_enumlist = []
  @pb3_typeconversion_tag = "_protobuf_type_converted"


  if @include_path.length > 0 and not class_file.strip.empty?
    raise LogStash::ConfigurationError, "Cannot use `include_path` and `class_file` at the same time"
  end

  if @include_path.length == 0 and class_file.strip.empty?
    raise LogStash::ConfigurationError, "Need to specify `include_path` or `class_file`"
  end

  should_register = Google::Protobuf::DescriptorPool.generated_pool.lookup(class_name).nil?

  unless @protobuf_root_directory.nil? or @protobuf_root_directory.strip.empty?
    if !$LOAD_PATH.include? @protobuf_root_directory and should_register
      $LOAD_PATH.unshift(@protobuf_root_directory)
    end
  end

  @class_file = "#{@protobuf_root_directory}/#{@class_file}" unless (Pathname.new @class_file).absolute? or @class_file.empty?
  # exclusive access while loading protobuf definitions
  Google::Protobuf::DescriptorPool.with_lock.synchronize do
    # load from `class_file`
    load_protobuf_definition(@class_file) if should_register and !@class_file.empty?
    # load from `include_path`
    include_path.each { |path| load_protobuf_definition(path) } if include_path.length > 0 and should_register

    if @protobuf_version == 3
      @pb_builder = Google::Protobuf::DescriptorPool.generated_pool.lookup(class_name).msgclass

    else
      @pb_builder = pb2_create_instance(class_name)
    end
  end
end

#reloadable?Boolean

Pipelines using this plugin cannot be reloaded. github.com/elastic/logstash/pull/6499

The DescriptorPool instance registers the protobuf classes (and dependencies) as global objects. This makes it very difficult to reload a pipeline, because ‘class_name` and all of its dependencies are already registered.

Returns:

  • (Boolean)


199
200
201
# File 'lib/logstash/codecs/protobuf.rb', line 199

def reloadable?
  return false
end