Class: FFWD::Plugin::Kafka::Output
- Inherits:
-
FFWD::ProducingClient::Producer
- Object
- FFWD::ProducingClient::Producer
- FFWD::Plugin::Kafka::Output
- Includes:
- Logging, Reporter
- Defined in:
- lib/ffwd/plugin/kafka/output.rb
Constant Summary collapse
- MAPPING =
[:host, :ttl, :key, :time, :value, :tags, :attributes]
- DEFAULT_PRODUCER =
"ffwd"
- DEFAULT_BROKERS =
["localhost:9092"]
Instance Attribute Summary collapse
-
#reporter_meta ⇒ Object
readonly
Returns the value of attribute reporter_meta.
Class Method Summary collapse
Instance Method Summary collapse
-
#initialize(schema, router, partitioner, config) ⇒ Output
constructor
A new instance of Output.
- #make_event_message(e) ⇒ Object
- #make_metric_message(m) ⇒ Object
- #produce(events, metrics) ⇒ Object
- #setup ⇒ Object
- #teardown ⇒ Object
Constructor Details
#initialize(schema, router, partitioner, config) ⇒ Output
Returns a new instance of Output.
44 45 46 47 48 49 50 51 52 |
# File 'lib/ffwd/plugin/kafka/output.rb', line 44 def initialize schema, router, partitioner, config @schema = schema @router = router @partitioner = partitioner @producer = config[:producer] @brokers = config[:brokers] @reporter_meta = {:producer_type => "kafka", :producer => @producer} @instance = nil end |
Instance Attribute Details
#reporter_meta ⇒ Object (readonly)
Returns the value of attribute reporter_meta.
31 32 33 |
# File 'lib/ffwd/plugin/kafka/output.rb', line 31 def @reporter_meta end |
Class Method Details
.prepare(config) ⇒ Object
38 39 40 41 42 |
# File 'lib/ffwd/plugin/kafka/output.rb', line 38 def self.prepare config config[:producer] ||= DEFAULT_PRODUCER config[:brokers] ||= DEFAULT_BROKERS config end |
Instance Method Details
#make_event_message(e) ⇒ Object
98 99 100 101 102 103 104 |
# File 'lib/ffwd/plugin/kafka/output.rb', line 98 def e topic = @router.route_event e return nil if topic.nil? data = @schema.dump_event e key = @partitioner.partition e MessageToSend.new topic, data, key end |
#make_metric_message(m) ⇒ Object
106 107 108 109 110 111 112 |
# File 'lib/ffwd/plugin/kafka/output.rb', line 106 def m topic = @router.route_metric m return nil if topic.nil? data = @schema.dump_metric m key = @partitioner.partition m MessageToSend.new topic, data, key end |
#produce(events, metrics) ⇒ Object
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/ffwd/plugin/kafka/output.rb', line 70 def produce events, metrics unless @instance return nil end = events.size + metrics.size = [] events.each do |e| = e next if .nil? << end metrics.each do |e| = e next if .nil? << end if .size < increment :routing_error, - .size end increment :routing_success, .size @instance. end |
#setup ⇒ Object
54 55 56 57 58 59 60 61 |
# File 'lib/ffwd/plugin/kafka/output.rb', line 54 def setup if not @brokers or @brokers.empty? log.error "No usable initial list of brokers" return end @instance = Producer.new @brokers, @producer end |
#teardown ⇒ Object
63 64 65 66 67 68 |
# File 'lib/ffwd/plugin/kafka/output.rb', line 63 def teardown if @instance @instance.stop @instance = nil end end |