Class: Hermann::Producer

Inherits:
Object
  • Object
show all
Defined in:
lib/hermann/producer.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(topic, brokers, opts = {}) ⇒ Producer

Initialize a producer object with a default topic and broker list

Parameters:

  • topic (String)

    The default topic to use for pushing messages

  • brokers (Array)

    An array of “host:port” strings for the brokers



20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/hermann/producer.rb', line 20

def initialize(topic, brokers, opts={})
  @topic = topic
  @brokers = ThreadSafe::Array.new(brokers)
  if Hermann.jruby?
    @internal = Hermann::Provider::JavaProducer.new(brokers.join(','), opts)
  else
    @internal = Hermann::Provider::RDKafka::Producer.new(brokers.join(','))
  end
  # We're tracking children so we can make sure that at Producer exit we
  # make a reasonable attempt to clean up outstanding result objects
  @children = ThreadSafe::Array.new
end

Instance Attribute Details

#brokersObject (readonly)

Returns the value of attribute brokers.



14
15
16
# File 'lib/hermann/producer.rb', line 14

def brokers
  @brokers
end

#childrenObject (readonly)

Returns the value of attribute children.



14
15
16
# File 'lib/hermann/producer.rb', line 14

def children
  @children
end

#internalObject (readonly)

Returns the value of attribute internal.



14
15
16
# File 'lib/hermann/producer.rb', line 14

def internal
  @internal
end

#topicObject (readonly)

Returns the value of attribute topic.



14
15
16
# File 'lib/hermann/producer.rb', line 14

def topic
  @topic
end

Instance Method Details

#connect(timeout = 0) ⇒ Object



44
45
46
# File 'lib/hermann/producer.rb', line 44

def connect(timeout=0)
  return @internal.connect(timeout * 1000)
end

#connected?Boolean

Returns True if our underlying producer object thinks it’s connected to a Kafka broker.

Returns:

  • (Boolean)

    True if our underlying producer object thinks it’s connected to a Kafka broker



35
36
37
# File 'lib/hermann/producer.rb', line 35

def connected?
  return @internal.connected?
end

#create_resultHermann::Result

Create a Hermann::Result that is tracked in the Producer’s children array

Returns:



88
89
90
91
# File 'lib/hermann/producer.rb', line 88

def create_result
  @children << Hermann::Result.new(self)
  return @children.last
end

#errored?Boolean

Returns True if the underlying producer object has errored.

Returns:

  • (Boolean)

    True if the underlying producer object has errored



40
41
42
# File 'lib/hermann/producer.rb', line 40

def errored?
  return @internal.errored?
end

#push(value, opts = {}) ⇒ Hermann::Result

Push a value onto the Kafka topic passed to this Producer

Parameters:

  • value (Object)

    A single object to push

  • opts (Hash) (defaults to: {})

    to pass to push method

Options Hash (opts):

  • :topic (String)

    The topic to push messages to :partition_key The string to partition by

Returns:

  • (Hermann::Result)

    A future-like object which will store the result from the broker



57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/hermann/producer.rb', line 57

def push(value, opts={})
  topic = opts[:topic] || @topic
  result = nil

  if value.kind_of? Array
    return value.map { |e| self.push(e, opts) }
  end

  if Hermann.jruby?
    result = @internal.push_single(value, topic, opts[:partition_key], nil)
    unless result.nil?
      @children << result
    end
    # Reaping children on the push just to make sure that it does get
    # called correctly and we don't leak memory
    reap_children
  else
    # Ticking reactor to make sure that we don't inadvertantly let the
    # librdkafka callback queue overflow
    tick_reactor
    result = create_result
    @internal.push_single(value, topic, opts[:partition_key].to_s, result)
  end

  return result
end

#reap_childrenFixNum

Returns number of children reaped.

Returns:

  • (FixNum)

    number of children reaped



118
119
120
121
122
123
124
125
# File 'lib/hermann/producer.rb', line 118

def reap_children
  # Filter all children who are no longer pending/fulfilled
  total_children = @children.size

  @children = @children.reject { |c| Hermann.jruby? ? c.fulfilled? : c.completed? }

  return (total_children - children.size)
end

#tick_reactor(timeout = 0) ⇒ FixNum

Tick the underlying librdkafka reacter and clean up any unreaped but reapable children results

Parameters:

  • timeout (FixNum) (defaults to: 0)

    Seconds to block on the internal reactor

Returns:

  • (FixNum)

    Number of Hermann::Result children reaped



98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/hermann/producer.rb', line 98

def tick_reactor(timeout=0)
  begin
    execute_tick(rounded_timeout(timeout))
  rescue StandardError => ex
    @children.each do |child|
      # Skip over any children that should already be reaped for other
      # reasons
      next if (Hermann.jruby? ? child.fulfilled? : child.completed?)

      # Propagate errors to the remaining children
      child.internal_set_error(ex)
    end
  end

  # Reaping the children at this point will also reap any children marked
  # as errored by an exception out of #execute_tick
  return reap_children
end