Class: Telekinesis::Producer::AsyncProducer
- Inherits:
-
Object
- Object
- Telekinesis::Producer::AsyncProducer
- Defined in:
- lib/telekinesis/producer/async_producer.rb
Overview
An asynchronous producer that buffers events into a queue and uses a background thread to send them to Kinesis. Only available on JRuby.
This class is thread-safe.
Constant Summary collapse
- MAX_PUT_RECORDS_SIZE =
For convenience
Telekinesis::Aws::KINESIS_MAX_PUT_RECORDS_SIZE
Instance Attribute Summary collapse
-
#client ⇒ Object
readonly
Returns the value of attribute client.
-
#failure_handler ⇒ Object
readonly
Returns the value of attribute failure_handler.
-
#stream ⇒ Object
readonly
Returns the value of attribute stream.
Class Method Summary collapse
-
.create(options = {}) ⇒ Object
Create a new producer.
Instance Method Summary collapse
-
#await(duration, unit = TimeUnit::SECONDS) ⇒ Object
Wait for this producer to shutdown.
-
#initialize(stream, client, failure_handler, options = {}) ⇒ AsyncProducer
constructor
Construct a new producer.
-
#put(key, data) ⇒ Object
Put a single key, value pair to Kinesis.
-
#put_all(items) ⇒ Object
Put all of the given key, value pairs to Kinesis.
-
#queue_size ⇒ Object
Return the number of events currently buffered by this producer.
-
#shutdown(block = false, duration = 2, unit = TimeUnit::SECONDS) ⇒ Object
Shut down this producer.
Constructor Details
#initialize(stream, client, failure_handler, options = {}) ⇒ AsyncProducer
Construct a new producer. Intended for internal use only - prefer #create unless it’s strictly necessary.
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/telekinesis/producer/async_producer.rb', line 44 def initialize(stream, client, failure_handler, = {}) @stream = stream or raise ArgumentError, "stream may not be nil" @client = client or raise ArgumentError, "client may not be nil" @failure_handler = failure_handler or raise ArgumentError, "failure_handler may not be nil" @shutdown = false queue_size = .fetch(:queue_size, 1000) send_every = .fetch(:send_every_ms, 1000) worker_count = .fetch(:worker_count, 1) raise ArgumentError(":worker_count must be > 0") unless worker_count > 0 send_size = .fetch(:send_size, MAX_PUT_RECORDS_SIZE) raise ArgumentError(":send_size too large") if send_size > MAX_PUT_RECORDS_SIZE retries = .fetch(:retries, 5) raise ArgumentError(":retries must be >= 0") unless retries >= 0 retry_interval = .fetch(:retry_interval, 1.0) raise ArgumentError(":retry_interval must be > 0") unless retry_interval > 0 # NOTE: For testing. @queue = [:queue] || ArrayBlockingQueue.new(queue_size) @lock = Telekinesis::JavaUtil::ReadWriteLock.new @worker_pool = build_executor(worker_count) @workers = worker_count.times.map do AsyncProducerWorker.new(self, @queue, send_size, send_every, retries, retry_interval) end # NOTE: Start by default. For testing. start unless .fetch(:manual_start, false) end |
Instance Attribute Details
#client ⇒ Object (readonly)
Returns the value of attribute client.
18 19 20 |
# File 'lib/telekinesis/producer/async_producer.rb', line 18 def client @client end |
#failure_handler ⇒ Object (readonly)
Returns the value of attribute failure_handler.
18 19 20 |
# File 'lib/telekinesis/producer/async_producer.rb', line 18 def failure_handler @failure_handler end |
#stream ⇒ Object (readonly)
Returns the value of attribute stream.
18 19 20 |
# File 'lib/telekinesis/producer/async_producer.rb', line 18 def stream @stream end |
Class Method Details
.create(options = {}) ⇒ Object
Create a new producer.
AWS credentials may be specified by using the ‘:credentials` option and passing a hash containing your `:access_key_id` and `:secret_access_key`. If unspecified, credentials will be fetched from the environment, an ~/.aws/credentials file, or the current instance metadata.
The producer’s ‘:worker_count`, internal `:queue_size`, the `:send_size` of batches to Kinesis and how often workers send data to Kinesis, even if their batches aren’t full (‘:send_every_ms`) can be configured as well. They all have reasonable defaults.
When requests to Kinesis fail, the configured ‘:failure_handler` will be called. If you don’t specify a failure handler, a NoopFailureHandler is used.
35 36 37 38 39 40 |
# File 'lib/telekinesis/producer/async_producer.rb', line 35 def self.create( = {}) stream = [:stream] client = Telekinesis::Aws::Client.build(.fetch(:credentials, {})) failure_handler = .fetch(:failure_handler, NoopFailureHandler.new) new(stream, client, failure_handler, ) end |
Instance Method Details
#await(duration, unit = TimeUnit::SECONDS) ⇒ Object
Wait for this producer to shutdown.
130 131 132 |
# File 'lib/telekinesis/producer/async_producer.rb', line 130 def await(duration, unit = TimeUnit::SECONDS) @worker_pool.await_termination(duration, unit) end |
#put(key, data) ⇒ Object
Put a single key, value pair to Kinesis. Both key and value must be strings.
This call returns immediately and returns true iff the producer is still accepting data. Data is put to Kinesis in the background.
79 80 81 |
# File 'lib/telekinesis/producer/async_producer.rb', line 79 def put(key, data) put_all(key => data) end |
#put_all(items) ⇒ Object
Put all of the given key, value pairs to Kinesis. Both key and value must be Strings.
This call returns immediately and returns true iff the producer is still accepting data. Data is put to Kinesis in the background.
88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/telekinesis/producer/async_producer.rb', line 88 def put_all(items) # NOTE: The lock ensures that no new data can be added to the queue after # the shutdown flag has been set. See the note in shutdown for details. @lock.read_lock do if @shutdown false else items.each do |key, data| @queue.put([key, data]) end true end end end |
#queue_size ⇒ Object
Return the number of events currently buffered by this producer. This doesn’t include any events buffered in workers that are currently on their way to Kinesis.
137 138 139 |
# File 'lib/telekinesis/producer/async_producer.rb', line 137 def queue_size @queue.size end |
#shutdown(block = false, duration = 2, unit = TimeUnit::SECONDS) ⇒ Object
Shut down this producer. After the call completes, the producer will not accept any more data, but will finish processing any data it has buffered internally.
If block = true is passed, this call will block and wait for the producer to shut down before returning. This wait times out after duration has passed.
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 |
# File 'lib/telekinesis/producer/async_producer.rb', line 110 def shutdown(block = false, duration = 2, unit = TimeUnit::SECONDS) # NOTE: Since a write_lock is exclusive, this prevents any data from being # added to the queue while the SHUTDOWN tokens are being inserted. Without # the lock, data can end up in the queue behind all of the shutdown tokens # and be lost. This happens if the shutdown flag is be flipped by a thread # calling shutdown after another thread has checked the "if @shutdown" # condition in put but before it's called queue.put. @lock.write_lock do @shutdown = true @workers.size.times do @queue.put(AsyncProducerWorker::SHUTDOWN) end end # Don't interrupt workers by calling shutdown_now. @worker_pool.shutdown await(duration, unit) if block end |