Class: Telekinesis::Producer::SyncProducer
- Inherits:
-
Object
- Object
- Telekinesis::Producer::SyncProducer
- Defined in:
- lib/telekinesis/producer/sync_producer.rb
Overview
A synchronous Kinesis producer.
This class is thread safe if and only if the underlying Telekines::Aws::Client is threadsafe. In practice, this means this client is threadsafe on JRuby and not thread safe elsewhere.
Instance Attribute Summary collapse
-
#client ⇒ Object
readonly
Returns the value of attribute client.
-
#stream ⇒ Object
readonly
Returns the value of attribute stream.
Class Method Summary collapse
-
.create(options = {}) ⇒ Object
Create a new Producer.
Instance Method Summary collapse
-
#initialize(stream, client, opts = {}) ⇒ SyncProducer
constructor
A new instance of SyncProducer.
-
#put(key, data) ⇒ Object
Put an individual k, v pair to Kinesis immediately.
-
#put_all(items) ⇒ Object
Put all of the [k, v] pairs to Kinesis in as few requests as possible.
Constructor Details
#initialize(stream, client, opts = {}) ⇒ SyncProducer
Returns a new instance of SyncProducer.
26 27 28 29 30 |
# File 'lib/telekinesis/producer/sync_producer.rb', line 26 def initialize(stream, client, opts = {}) @stream = stream or raise ArgumentError, "stream may not be nil" @client = client or raise ArgumentError, "client may not be nil" @send_size = opts.fetch(:send_size, Telekinesis::Aws::KINESIS_MAX_PUT_RECORDS_SIZE) end |
Instance Attribute Details
#client ⇒ Object (readonly)
Returns the value of attribute client.
9 10 11 |
# File 'lib/telekinesis/producer/sync_producer.rb', line 9 def client @client end |
#stream ⇒ Object (readonly)
Returns the value of attribute stream.
9 10 11 |
# File 'lib/telekinesis/producer/sync_producer.rb', line 9 def stream @stream end |
Class Method Details
.create(options = {}) ⇒ Object
Create a new Producer.
AWS credentials may be specified by using the ‘:credentials` option and passing a hash containing your `:access_key_id` and `:secret_access_key`. If unspecified, credentials will be fetched from the environment, an ~/.aws/credentials file, or the current instance metadata.
‘:send_size` may also be used to configure the maximum batch size used in `put_all`. See `put_all` for more info.
20 21 22 23 24 |
# File 'lib/telekinesis/producer/sync_producer.rb', line 20 def self.create( = {}) stream = [:stream] client = Telekinesis::Aws::Client.build(.fetch(:credentials, {})) new(stream, client, failure_handler, ) end |
Instance Method Details
#put(key, data) ⇒ Object
Put an individual k, v pair to Kinesis immediately. Both k and v must be strings.
Returns once the call to Kinesis is complete.
36 37 38 |
# File 'lib/telekinesis/producer/sync_producer.rb', line 36 def put(key, data) @client.put_record(@stream, key, data) end |
#put_all(items) ⇒ Object
Put all of the [k, v] pairs to Kinesis in as few requests as possible. All of the ks and vs must be strings.
Each request sends at most ‘:send_size` records. By default this is the Kinesis API limit of 500 records.
45 46 47 48 49 |
# File 'lib/telekinesis/producer/sync_producer.rb', line 45 def put_all(items) items.each_slice(@send_size).flat_map do |batch| @client.put_records(@stream, batch) end end |