Class: Telekinesis::Producer::SyncProducer

Inherits:
Object
  • Object
show all
Defined in:
lib/telekinesis/producer/sync_producer.rb

Overview

A synchronous Kinesis producer.

This class is thread safe if and only if the underlying Telekines::Aws::Client is threadsafe. In practice, this means this client is threadsafe on JRuby and not thread safe elsewhere.

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(stream, client, opts = {}) ⇒ SyncProducer

Returns a new instance of SyncProducer.



26
27
28
29
30
# File 'lib/telekinesis/producer/sync_producer.rb', line 26

def initialize(stream, client, opts = {})
  @stream = stream or raise ArgumentError, "stream may not be nil"
  @client = client or raise ArgumentError, "client may not be nil"
  @send_size = opts.fetch(:send_size, Telekinesis::Aws::KINESIS_MAX_PUT_RECORDS_SIZE)
end

Instance Attribute Details

#clientObject (readonly)

Returns the value of attribute client.



9
10
11
# File 'lib/telekinesis/producer/sync_producer.rb', line 9

def client
  @client
end

#streamObject (readonly)

Returns the value of attribute stream.



9
10
11
# File 'lib/telekinesis/producer/sync_producer.rb', line 9

def stream
  @stream
end

Class Method Details

.create(options = {}) ⇒ Object

Create a new Producer.

AWS credentials may be specified by using the ‘:credentials` option and passing a hash containing your `:access_key_id` and `:secret_access_key`. If unspecified, credentials will be fetched from the environment, an ~/.aws/credentials file, or the current instance metadata.

‘:send_size` may also be used to configure the maximum batch size used in `put_all`. See `put_all` for more info.



20
21
22
23
24
# File 'lib/telekinesis/producer/sync_producer.rb', line 20

def self.create(options = {})
  stream = options[:stream]
  client = Telekinesis::Aws::Client.build(options.fetch(:credentials, {}))
  new(stream, client, failure_handler, options)
end

Instance Method Details

#put(key, data) ⇒ Object

Put an individual k, v pair to Kinesis immediately. Both k and v must be strings.

Returns once the call to Kinesis is complete.



36
37
38
# File 'lib/telekinesis/producer/sync_producer.rb', line 36

def put(key, data)
  @client.put_record(@stream, key, data)
end

#put_all(items) ⇒ Object

Put all of the [k, v] pairs to Kinesis in as few requests as possible. All of the ks and vs must be strings.

Each request sends at most ‘:send_size` records. By default this is the Kinesis API limit of 500 records.



45
46
47
48
49
# File 'lib/telekinesis/producer/sync_producer.rb', line 45

def put_all(items)
  items.each_slice(@send_size).flat_map do |batch|
    @client.put_records(@stream, batch)
  end
end