Class: FFWD::Plugin::Kafka::Output

Inherits:
FFWD::ProducingClient::Producer
  • Object
show all
Includes:
Logging, Reporter
Defined in:
lib/ffwd/plugin/kafka/output.rb

Constant Summary collapse

MAPPING =
[:host, :ttl, :key, :time, :value, :tags, :attributes]
DEFAULT_PRODUCER =
"ffwd"
DEFAULT_BROKERS =
["localhost:9092"]

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(schema, router, partitioner, config) ⇒ Output

Returns a new instance of Output.



44
45
46
47
48
49
50
51
52
# File 'lib/ffwd/plugin/kafka/output.rb', line 44

def initialize schema, router, partitioner, config
  @schema = schema
  @router = router
  @partitioner = partitioner
  @producer = config[:producer]
  @brokers = config[:brokers]
  @reporter_meta = {:producer_type => "kafka", :producer => @producer}
  @instance = nil
end

Instance Attribute Details

#reporter_metaObject (readonly)

Returns the value of attribute reporter_meta.



31
32
33
# File 'lib/ffwd/plugin/kafka/output.rb', line 31

def reporter_meta
  @reporter_meta
end

Class Method Details

.prepare(config) ⇒ Object



38
39
40
41
42
# File 'lib/ffwd/plugin/kafka/output.rb', line 38

def self.prepare config
  config[:producer] ||= DEFAULT_PRODUCER
  config[:brokers] ||= DEFAULT_BROKERS
  config
end

Instance Method Details

#make_event_message(e) ⇒ Object



98
99
100
101
102
103
104
# File 'lib/ffwd/plugin/kafka/output.rb', line 98

def make_event_message e
  topic = @router.route_event e
  return nil if topic.nil?
  data = @schema.dump_event e
  key = @partitioner.partition e
  MessageToSend.new topic, data, key
end

#make_metric_message(m) ⇒ Object



106
107
108
109
110
111
112
# File 'lib/ffwd/plugin/kafka/output.rb', line 106

def make_metric_message m
  topic = @router.route_metric m
  return nil if topic.nil?
  data = @schema.dump_metric m
  key = @partitioner.partition m
  MessageToSend.new topic, data, key
end

#produce(events, metrics) ⇒ Object



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/ffwd/plugin/kafka/output.rb', line 70

def produce events, metrics
  unless @instance
    return nil
  end

  expected_messages = events.size + metrics.size
  messages = []

  events.each do |e|
    message = make_event_message e
    next if message.nil?
    messages << message
  end

  metrics.each do |e|
    message = make_metric_message e
    next if message.nil?
    messages << message
  end

  if messages.size < expected_messages
    increment :routing_error, expected_messages - messages.size
  end

  increment :routing_success, messages.size
  @instance.send_messages messages
end

#setupObject



54
55
56
57
58
59
60
61
# File 'lib/ffwd/plugin/kafka/output.rb', line 54

def setup
  if not @brokers or @brokers.empty?
    log.error "No usable initial list of brokers"
    return
  end

  @instance = Producer.new @brokers, @producer
end

#teardownObject



63
64
65
66
67
68
# File 'lib/ffwd/plugin/kafka/output.rb', line 63

def teardown
  if @instance
    @instance.stop
    @instance = nil
  end
end