Module: FFWD::Plugin::Kafka

Includes:
Logging, FFWD::Plugin
Defined in:
lib/ffwd/plugin/kafka/partitioners.rb,
lib/ffwd/plugin/kafka/version.rb,
lib/ffwd/plugin/kafka/producer.rb,
lib/ffwd/plugin/kafka/routers.rb,
lib/ffwd/plugin/kafka/output.rb,
lib/ffwd/plugin/kafka.rb

Overview

$LICENSE Copyright 2013-2014 Spotify AB. All rights reserved.

The contents of this file are licensed under the Apache License, Version 2.0 (the “License”); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an “AS IS” BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Defined Under Namespace

Modules: HostPartitioner, KeyPartitioner Classes: AttributePartitioner, AttributeRouter, Output, Producer, Setup

Constant Summary collapse

VERSION =
"0.4.6"
MessageToSend =
Poseidon::MessageToSend
DEFAULT_PARTITIONER =
:host
DEFAULT_ROUTER =
:attribute

Class Method Summary collapse

Class Method Details

.build_partitioner(config) ⇒ Object



74
75
76
77
78
79
# File 'lib/ffwd/plugin/kafka/partitioners.rb', line 74

def self.build_partitioner config
  type = config[:type]
  return HostPartitioner if type == :host
  return KeyPartitioner if type == :key
  return AttributePartitioner.new config
end

.build_router(config) ⇒ Object



97
98
99
100
101
# File 'lib/ffwd/plugin/kafka/routers.rb', line 97

def self.build_router config
  type = config[:type]
  return AttributeRouter.new config if type == :attribute
  raise "Unsupported router type: #{type}"
end

.prepare_partitioner(config) ⇒ Object



67
68
69
70
71
72
# File 'lib/ffwd/plugin/kafka/partitioners.rb', line 67

def self.prepare_partitioner config
  type = (config[:type] ||= DEFAULT_PARTITIONER)
  return config if type == :host
  return config if type == :key
  AttributePartitioner.prepare config
end

.prepare_router(config) ⇒ Object



91
92
93
94
95
# File 'lib/ffwd/plugin/kafka/routers.rb', line 91

def self.prepare_router config
  type = (config[:type] ||= DEFAULT_ROUTER)
  return AttributeRouter.prepare config if type == :attribute
  raise "Unsupported router type: #{type}"
end

.setup_output(config) ⇒ Object



82
83
84
# File 'lib/ffwd/plugin/kafka.rb', line 82

def self.setup_output config
  Setup.new config
end