Class: Octo::KafkaBridge
- Inherits:
-
Object
- Object
- Octo::KafkaBridge
- Defined in:
- lib/octocore-mongo/kafka_bridge.rb
Overview
The bridge between Kafka and ruby
Constant Summary collapse
- CLIENT_ID =
These are hard wired
ENV['KAFKA_CLIENT_ID']
- TOPIC =
ENV['KAFKA_TOPIC']
- MAX_BUFFER_SIZE =
20_000
- MAX_QUEUE_SIZE =
10_000
- DELIVERY_INTERVAL =
1
- BROKERS =
Changes as per environment
ENV['KAFKA_BROKERS'].try(:split, ',')
Instance Method Summary collapse
-
#initialize(opts = {}) ⇒ KafkaBridge
constructor
A new instance of KafkaBridge.
- #push(params) ⇒ Object
- #teardown ⇒ Object
Constructor Details
#initialize(opts = {}) ⇒ KafkaBridge
Returns a new instance of KafkaBridge.
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
# File 'lib/octocore-mongo/kafka_bridge.rb', line 20 def initialize(opts = {}) opts.deep_symbolize_keys! @kafka = ::Kafka.new(seed_brokers: opts.fetch(:brokers, BROKERS), client_id: opts.fetch(:client_id, CLIENT_ID) ) @producer = @kafka.async_producer( max_buffer_size: opts.fetch(:max_buffer_size, MAX_BUFFER_SIZE), max_queue_size: opts.fetch(:max_queue_size, MAX_QUEUE_SIZE), delivery_interval: opts.fetch(:delivery_interval, DELIVERY_INTERVAL), ) if opts.has_key?(:topic) @topic = opts[:topic] else @topic = TOPIC end end |
Instance Method Details
#push(params) ⇒ Object
37 38 39 |
# File 'lib/octocore-mongo/kafka_bridge.rb', line 37 def push(params) params end |
#teardown ⇒ Object
41 42 43 |
# File 'lib/octocore-mongo/kafka_bridge.rb', line 41 def teardown @producer.shutdown end |