Class: Wukong::Load::KafkaLoader
- Defined in:
- lib/wukong-load/loaders/kafka.rb
Overview
Loads data into Kafka.
Uses the kafka-rb
gem to create a Kafka::Producer to write to
Kafka.
Allows loading records into a given topic on a given partition.
Records can have fields _topic
and _partition
which override
the given topic and partition on a per-record basis.
The names of these fields within each record (_topic
and
_partition
) can be customized.
Instance Attribute Summary collapse
-
#producer ⇒ Object
The Kafka producer used to send messages to Kafka.
Instance Method Summary collapse
-
#load(record) ⇒ Object
Load a single record into Kafka.
-
#messages_for(record) ⇒ Object
:nodoc:.
-
#partition_for(record) ⇒ Object
:nodoc:.
-
#setup ⇒ Object
Creates the producer.
-
#topic_for(record) ⇒ Object
:nodoc:.
Methods inherited from Loader
Instance Attribute Details
#producer ⇒ Object
The Kafka producer used to send messages to Kafka.
45 46 47 |
# File 'lib/wukong-load/loaders/kafka.rb', line 45 def producer @producer end |
Instance Method Details
#load(record) ⇒ Object
Load a single record into Kafka.
65 66 67 68 69 70 71 72 73 74 |
# File 'lib/wukong-load/loaders/kafka.rb', line 65 def load record begin topic = topic_for(record) partition = partition_for(record) bytes = producer.send(topic, (record), :partition => partition) log.info("Wrote #{bytes} bytes to #{topic}/#{partition}") rescue => e handle_error(record, e) end end |
#messages_for(record) ⇒ Object
:nodoc:
82 83 84 |
# File 'lib/wukong-load/loaders/kafka.rb', line 82 def record [Kafka::Message.new(MultiJson.dump(record))] end |
#partition_for(record) ⇒ Object
:nodoc:
87 88 89 |
# File 'lib/wukong-load/loaders/kafka.rb', line 87 def partition_for record record[partition_field] ? record[partition_field].to_i : partition end |
#setup ⇒ Object
Creates the producer.
48 49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/wukong-load/loaders/kafka.rb', line 48 def setup begin require 'kafka' rescue LoadError => e raise Error.new("Please ensure that the 'kafka-rb' gem is installed and available (in your Gemfile)") end log.debug("Connecting to Kafka broker at #{host}:#{port}...") begin self.producer = Kafka::MultiProducer.new(:host => host, :port => port) rescue => e raise Error.new(e.) end end |
#topic_for(record) ⇒ Object
:nodoc:
77 78 79 |
# File 'lib/wukong-load/loaders/kafka.rb', line 77 def topic_for record record[topic_field] || self.topic end |