Class: FFWD::Plugin::Kafka::Producer

Inherits:
Object
  • Object
show all
Defined in:
lib/ffwd/plugin/kafka/producer.rb

Overview

A Kafka producer proxy for Poseidon (a kafka library) that delegates all blocking work to the EventMachine thread pool.

Defined Under Namespace

Classes: Request

Instance Method Summary collapse

Constructor Details

#initialize(*args) ⇒ Producer

Returns a new instance of Producer.



28
29
30
31
32
33
# File 'lib/ffwd/plugin/kafka/producer.rb', line 28

def initialize *args
  @args = args
  @mutex = Mutex.new
  @request = nil
  @stopped = false
end

Instance Method Details

#execute(&block) ⇒ Object

Execute the provided block on a dedicated thread. The sole provided argument is an instance of Poseidon::Producer.



66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/ffwd/plugin/kafka/producer.rb', line 66

def execute &block
  raise "Expected block" unless block_given?
  raise "Request already pending" if @request

  if @stopped
    r = Request.new
    r.fail "producer stopped"
    return r
  end

  @request = Request.new

  EM.defer do
    begin
      result = block.call make_producer

      EM.next_tick do
        @request.succeed result
        @request = nil
        shutdown if @stopped
      end
    rescue => e
      EM.next_tick do
        @request.fail e
        @request = nil
        shutdown if @stopped
      end
    end
  end

  @request
end

#make_producerObject



54
55
56
57
58
59
60
61
62
# File 'lib/ffwd/plugin/kafka/producer.rb', line 54

def make_producer
  if EM.reactor_thread?
    raise "Should not be called in the reactor thread"
  end

  @mutex.synchronize do
    @producer ||= Poseidon::Producer.new(*@args)
  end
end

#send_messages(messages) ⇒ Object



48
49
50
51
52
# File 'lib/ffwd/plugin/kafka/producer.rb', line 48

def send_messages messages
  execute do |p|
    p.send_messages messages
  end
end

#shutdownObject



40
41
42
43
44
45
46
# File 'lib/ffwd/plugin/kafka/producer.rb', line 40

def shutdown
  return if @request

  @mutex.synchronize do
    @producer.shutdown
  end
end

#stopObject



35
36
37
38
# File 'lib/ffwd/plugin/kafka/producer.rb', line 35

def stop
  @stopped = true
  shutdown
end