Class: Stapfen::Client::Kafka

Inherits:
Object
  • Object
show all
Defined in:
lib/stapfen/client/kafka.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(configuration) ⇒ Kafka

Initialize a Kafka client object

Parameters:

  • configuration (Hash)

    a customizable set of options

Options Hash (configuration):

  • :topic (String)

    The kafka topic

  • :groupId (String)

    The kafka groupId

  • :zookeepers (String)

    Comma separated list of zookeepers

  • :consumer_opts (Hash)

    Options for Hermann consumer

Raises:



26
27
28
29
30
31
32
33
34
35
# File 'lib/stapfen/client/kafka.rb', line 26

def initialize(configuration)
  super()
  @config     = configuration
  @topic      = @config[:topic]
  @groupId    = @config[:groupId]
  @zookeepers = @config[:zookeepers]
  opts        = @config[:consumer_opts]
  raise ConfigurationError unless @groupId && @zookeepers
  @connection = Hermann::Consumer.new(@topic, @groupId, @zookeepers, opts)
end

Instance Attribute Details

#connectionObject (readonly)

Returns the value of attribute connection.



15
16
17
# File 'lib/stapfen/client/kafka.rb', line 15

def connection
  @connection
end

#producerObject (readonly)

Returns the value of attribute producer.



15
16
17
# File 'lib/stapfen/client/kafka.rb', line 15

def producer
  @producer
end

Instance Method Details

#can_unreceive?Boolean

Cannot unreceive

Returns:

  • (Boolean)


43
44
45
# File 'lib/stapfen/client/kafka.rb', line 43

def can_unreceive?
  false
end

#closeBoolean

Closes the consumer threads created by kafka.

Returns:

  • (Boolean)

    True/false depending on whether we actually closed the connection



59
60
61
62
63
64
# File 'lib/stapfen/client/kafka.rb', line 59

def close
  return false unless @connection
  @connection.shutdown
  @connection = nil
  return true
end

#closed?Boolean

API compatibilty method, doesn’t actually indicate that the connection is closed. Will only return true if no connection currently exists

Returns:

  • (Boolean)


51
52
53
# File 'lib/stapfen/client/kafka.rb', line 51

def closed?
  return connection.nil?
end

#connect(*args) ⇒ Object

This method is not implemenented



38
39
40
# File 'lib/stapfen/client/kafka.rb', line 38

def connect(*args)
  # No-op
end

#runloopObject



77
78
79
80
81
# File 'lib/stapfen/client/kafka.rb', line 77

def runloop
  loop do
    sleep 1
  end
end

#subscribe(destination, headers = {}, &block) ⇒ Object

Subscribes to a destination (i.e. kafka topic) and consumes messages



73
74
75
# File 'lib/stapfen/client/kafka.rb', line 73

def subscribe(destination, headers={}, &block)
  connection.consume(destination, &block)
end