Class: Hermann::Producer
- Inherits:
-
Object
- Object
- Hermann::Producer
- Defined in:
- lib/hermann/producer.rb
Instance Attribute Summary collapse
-
#brokers ⇒ Object
readonly
Returns the value of attribute brokers.
-
#children ⇒ Object
readonly
Returns the value of attribute children.
-
#internal ⇒ Object
readonly
Returns the value of attribute internal.
-
#topic ⇒ Object
readonly
Returns the value of attribute topic.
Instance Method Summary collapse
- #connect(timeout = 0) ⇒ Object
-
#connected? ⇒ Boolean
True if our underlying producer object thinks it’s connected to a Kafka broker.
-
#create_result ⇒ Hermann::Result
Create a
Hermann::Result
that is tracked in the Producer’s children array. -
#errored? ⇒ Boolean
True if the underlying producer object has errored.
-
#initialize(topic, brokers, opts = {}) ⇒ Producer
constructor
Initialize a producer object with a default topic and broker list.
-
#push(value, opts = {}) ⇒ Hermann::Result
Push a value onto the Kafka topic passed to this
Producer
. -
#reap_children ⇒ FixNum
Number of children reaped.
-
#tick_reactor(timeout = 0) ⇒ FixNum
Tick the underlying librdkafka reacter and clean up any unreaped but reapable children results.
Constructor Details
#initialize(topic, brokers, opts = {}) ⇒ Producer
Initialize a producer object with a default topic and broker list
20 21 22 23 24 25 26 27 28 29 30 31 |
# File 'lib/hermann/producer.rb', line 20 def initialize(topic, brokers, opts={}) @topic = topic @brokers = ThreadSafe::Array.new(brokers) if Hermann.jruby? @internal = Hermann::Provider::JavaProducer.new(brokers.join(','), opts) else @internal = Hermann::Provider::RDKafka::Producer.new(brokers.join(',')) end # We're tracking children so we can make sure that at Producer exit we # make a reasonable attempt to clean up outstanding result objects @children = ThreadSafe::Array.new end |
Instance Attribute Details
#brokers ⇒ Object (readonly)
Returns the value of attribute brokers.
14 15 16 |
# File 'lib/hermann/producer.rb', line 14 def brokers @brokers end |
#children ⇒ Object (readonly)
Returns the value of attribute children.
14 15 16 |
# File 'lib/hermann/producer.rb', line 14 def children @children end |
#internal ⇒ Object (readonly)
Returns the value of attribute internal.
14 15 16 |
# File 'lib/hermann/producer.rb', line 14 def internal @internal end |
#topic ⇒ Object (readonly)
Returns the value of attribute topic.
14 15 16 |
# File 'lib/hermann/producer.rb', line 14 def topic @topic end |
Instance Method Details
#connect(timeout = 0) ⇒ Object
44 45 46 |
# File 'lib/hermann/producer.rb', line 44 def connect(timeout=0) return @internal.connect(timeout * 1000) end |
#connected? ⇒ Boolean
Returns True if our underlying producer object thinks it’s connected to a Kafka broker.
35 36 37 |
# File 'lib/hermann/producer.rb', line 35 def connected? return @internal.connected? end |
#create_result ⇒ Hermann::Result
Create a Hermann::Result
that is tracked in the Producer’s children array
88 89 90 91 |
# File 'lib/hermann/producer.rb', line 88 def create_result @children << Hermann::Result.new(self) return @children.last end |
#errored? ⇒ Boolean
Returns True if the underlying producer object has errored.
40 41 42 |
# File 'lib/hermann/producer.rb', line 40 def errored? return @internal.errored? end |
#push(value, opts = {}) ⇒ Hermann::Result
Push a value onto the Kafka topic passed to this Producer
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/hermann/producer.rb', line 57 def push(value, opts={}) topic = opts[:topic] || @topic result = nil if value.kind_of? Array return value.map { |e| self.push(e, opts) } end if Hermann.jruby? result = @internal.push_single(value, topic, opts[:partition_key], nil) unless result.nil? @children << result end # Reaping children on the push just to make sure that it does get # called correctly and we don't leak memory reap_children else # Ticking reactor to make sure that we don't inadvertantly let the # librdkafka callback queue overflow tick_reactor result = create_result @internal.push_single(value, topic, opts[:partition_key].to_s, result) end return result end |
#reap_children ⇒ FixNum
Returns number of children reaped.
118 119 120 121 122 123 124 125 |
# File 'lib/hermann/producer.rb', line 118 def reap_children # Filter all children who are no longer pending/fulfilled total_children = @children.size @children = @children.reject { |c| Hermann.jruby? ? c.fulfilled? : c.completed? } return (total_children - children.size) end |
#tick_reactor(timeout = 0) ⇒ FixNum
Tick the underlying librdkafka reacter and clean up any unreaped but reapable children results
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 |
# File 'lib/hermann/producer.rb', line 98 def tick_reactor(timeout=0) begin execute_tick(rounded_timeout(timeout)) rescue StandardError => ex @children.each do |child| # Skip over any children that should already be reaped for other # reasons next if (Hermann.jruby? ? child.fulfilled? : child.completed?) # Propagate errors to the remaining children child.internal_set_error(ex) end end # Reaping the children at this point will also reap any children marked # as errored by an exception out of #execute_tick return reap_children end |