Class: FFWD::Plugin::Kafka::Producer
- Inherits:
-
Object
- Object
- FFWD::Plugin::Kafka::Producer
- Defined in:
- lib/ffwd/plugin/kafka/producer.rb
Overview
A Kafka producer proxy for Poseidon (a kafka library) that delegates all blocking work to the EventMachine thread pool.
Defined Under Namespace
Classes: Request
Instance Method Summary collapse
-
#execute(&block) ⇒ Object
Execute the provided block on a dedicated thread.
-
#initialize(*args) ⇒ Producer
constructor
A new instance of Producer.
- #make_producer ⇒ Object
- #send_messages(messages) ⇒ Object
- #shutdown ⇒ Object
- #stop ⇒ Object
Constructor Details
#initialize(*args) ⇒ Producer
Returns a new instance of Producer.
28 29 30 31 32 33 |
# File 'lib/ffwd/plugin/kafka/producer.rb', line 28 def initialize *args @args = args @mutex = Mutex.new @request = nil @stopped = false end |
Instance Method Details
#execute(&block) ⇒ Object
Execute the provided block on a dedicated thread. The sole provided argument is an instance of Poseidon::Producer.
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/ffwd/plugin/kafka/producer.rb', line 66 def execute &block raise "Expected block" unless block_given? raise "Request already pending" if @request if @stopped r = Request.new r.fail "producer stopped" return r end @request = Request.new EM.defer do begin result = block.call make_producer EM.next_tick do @request.succeed result @request = nil shutdown if @stopped end rescue => e EM.next_tick do @request.fail e @request = nil shutdown if @stopped end end end @request end |
#make_producer ⇒ Object
54 55 56 57 58 59 60 61 62 |
# File 'lib/ffwd/plugin/kafka/producer.rb', line 54 def make_producer if EM.reactor_thread? raise "Should not be called in the reactor thread" end @mutex.synchronize do @producer ||= Poseidon::Producer.new(*@args) end end |
#send_messages(messages) ⇒ Object
48 49 50 51 52 |
# File 'lib/ffwd/plugin/kafka/producer.rb', line 48 def execute do |p| p. end end |
#shutdown ⇒ Object
40 41 42 43 44 45 46 |
# File 'lib/ffwd/plugin/kafka/producer.rb', line 40 def shutdown return if @request @mutex.synchronize do @producer.shutdown end end |
#stop ⇒ Object
35 36 37 38 |
# File 'lib/ffwd/plugin/kafka/producer.rb', line 35 def stop @stopped = true shutdown end |