Class: Stapfen::Client::Kafka
- Inherits:
-
Object
- Object
- Stapfen::Client::Kafka
- Defined in:
- lib/stapfen/client/kafka.rb
Instance Attribute Summary collapse
-
#connection ⇒ Object
readonly
Returns the value of attribute connection.
-
#producer ⇒ Object
readonly
Returns the value of attribute producer.
Instance Method Summary collapse
-
#can_unreceive? ⇒ Boolean
Cannot unreceive.
-
#close ⇒ Boolean
Closes the consumer threads created by kafka.
-
#closed? ⇒ Boolean
API compatibilty method, doesn’t actually indicate that the connection is closed.
-
#connect(*args) ⇒ Object
This method is not implemenented.
-
#initialize(configuration) ⇒ Kafka
constructor
Initialize a Kafka client object.
- #runloop ⇒ Object
-
#subscribe(destination, headers = {}, &block) ⇒ Object
Subscribes to a destination (i.e. kafka topic) and consumes messages.
Constructor Details
#initialize(configuration) ⇒ Kafka
Initialize a Kafka client object
26 27 28 29 30 31 32 33 34 35 |
# File 'lib/stapfen/client/kafka.rb', line 26 def initialize(configuration) super() @config = configuration @topic = @config[:topic] @groupId = @config[:groupId] @zookeepers = @config[:zookeepers] opts = @config[:consumer_opts] raise ConfigurationError unless @groupId && @zookeepers @connection = Hermann::Consumer.new(@topic, @groupId, @zookeepers, opts) end |
Instance Attribute Details
#connection ⇒ Object (readonly)
Returns the value of attribute connection.
15 16 17 |
# File 'lib/stapfen/client/kafka.rb', line 15 def connection @connection end |
#producer ⇒ Object (readonly)
Returns the value of attribute producer.
15 16 17 |
# File 'lib/stapfen/client/kafka.rb', line 15 def producer @producer end |
Instance Method Details
#can_unreceive? ⇒ Boolean
Cannot unreceive
43 44 45 |
# File 'lib/stapfen/client/kafka.rb', line 43 def can_unreceive? false end |
#close ⇒ Boolean
Closes the consumer threads created by kafka.
59 60 61 62 63 64 |
# File 'lib/stapfen/client/kafka.rb', line 59 def close return false unless @connection @connection.shutdown @connection = nil return true end |
#closed? ⇒ Boolean
API compatibilty method, doesn’t actually indicate that the connection is closed. Will only return true if no connection currently exists
51 52 53 |
# File 'lib/stapfen/client/kafka.rb', line 51 def closed? return connection.nil? end |
#connect(*args) ⇒ Object
This method is not implemenented
38 39 40 |
# File 'lib/stapfen/client/kafka.rb', line 38 def connect(*args) # No-op end |
#runloop ⇒ Object
77 78 79 80 81 |
# File 'lib/stapfen/client/kafka.rb', line 77 def runloop loop do sleep 1 end end |
#subscribe(destination, headers = {}, &block) ⇒ Object
Subscribes to a destination (i.e. kafka topic) and consumes messages
73 74 75 |
# File 'lib/stapfen/client/kafka.rb', line 73 def subscribe(destination, headers={}, &block) connection.consume(destination, &block) end |