Class: Telekinesis::Producer::AsyncProducer

Inherits:
Object
  • Object
show all
Defined in:
lib/telekinesis/producer/async_producer.rb

Overview

An asynchronous producer that buffers events into a queue and uses a background thread to send them to Kinesis. Only available on JRuby.

This class is thread-safe.

Constant Summary collapse

MAX_PUT_RECORDS_SIZE =

For convenience

Telekinesis::Aws::KINESIS_MAX_PUT_RECORDS_SIZE

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(stream, client, failure_handler, options = {}) ⇒ AsyncProducer

Construct a new producer. Intended for internal use only - prefer #create unless it’s strictly necessary.



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/telekinesis/producer/async_producer.rb', line 44

def initialize(stream, client, failure_handler, options = {})
  @stream = stream or raise ArgumentError, "stream may not be nil"
  @client = client or raise ArgumentError, "client may not be nil"
  @failure_handler = failure_handler or raise ArgumentError, "failure_handler may not be nil"
  @shutdown = false

  queue_size     = options.fetch(:queue_size, 1000)
  send_every     = options.fetch(:send_every_ms, 1000)
  worker_count   = options.fetch(:worker_count, 1)
  raise ArgumentError(":worker_count must be > 0") unless worker_count > 0
  send_size      = options.fetch(:send_size, MAX_PUT_RECORDS_SIZE)
  raise ArgumentError(":send_size too large") if send_size > MAX_PUT_RECORDS_SIZE
  retries        = options.fetch(:retries, 5)
  raise ArgumentError(":retries must be >= 0") unless retries >= 0
  retry_interval = options.fetch(:retry_interval, 1.0)
  raise ArgumentError(":retry_interval must be > 0") unless retry_interval > 0

  # NOTE: For testing.
  @queue = options[:queue] || ArrayBlockingQueue.new(queue_size)

  @lock = Telekinesis::JavaUtil::ReadWriteLock.new
  @worker_pool = build_executor(worker_count)
  @workers = worker_count.times.map do
    AsyncProducerWorker.new(self, @queue, send_size, send_every, retries, retry_interval)
  end

  # NOTE: Start by default. For testing.
  start unless options.fetch(:manual_start, false)
end

Instance Attribute Details

#clientObject (readonly)

Returns the value of attribute client.



18
19
20
# File 'lib/telekinesis/producer/async_producer.rb', line 18

def client
  @client
end

#failure_handlerObject (readonly)

Returns the value of attribute failure_handler.



18
19
20
# File 'lib/telekinesis/producer/async_producer.rb', line 18

def failure_handler
  @failure_handler
end

#streamObject (readonly)

Returns the value of attribute stream.



18
19
20
# File 'lib/telekinesis/producer/async_producer.rb', line 18

def stream
  @stream
end

Class Method Details

.create(options = {}) ⇒ Object

Create a new producer.

AWS credentials may be specified by using the ‘:credentials` option and passing a hash containing your `:access_key_id` and `:secret_access_key`. If unspecified, credentials will be fetched from the environment, an ~/.aws/credentials file, or the current instance metadata.

The producer’s ‘:worker_count`, internal `:queue_size`, the `:send_size` of batches to Kinesis and how often workers send data to Kinesis, even if their batches aren’t full (‘:send_every_ms`) can be configured as well. They all have reasonable defaults.

When requests to Kinesis fail, the configured ‘:failure_handler` will be called. If you don’t specify a failure handler, a NoopFailureHandler is used.



35
36
37
38
39
40
# File 'lib/telekinesis/producer/async_producer.rb', line 35

def self.create(options = {})
  stream = options[:stream]
  client = Telekinesis::Aws::Client.build(options.fetch(:credentials, {}))
  failure_handler = options.fetch(:failure_handler, NoopFailureHandler.new)
  new(stream, client, failure_handler, options)
end

Instance Method Details

#await(duration, unit = TimeUnit::SECONDS) ⇒ Object

Wait for this producer to shutdown.



130
131
132
# File 'lib/telekinesis/producer/async_producer.rb', line 130

def await(duration, unit = TimeUnit::SECONDS)
  @worker_pool.await_termination(duration, unit)
end

#put(key, data) ⇒ Object

Put a single key, value pair to Kinesis. Both key and value must be strings.

This call returns immediately and returns true iff the producer is still accepting data. Data is put to Kinesis in the background.



79
80
81
# File 'lib/telekinesis/producer/async_producer.rb', line 79

def put(key, data)
  put_all(key => data)
end

#put_all(items) ⇒ Object

Put all of the given key, value pairs to Kinesis. Both key and value must be Strings.

This call returns immediately and returns true iff the producer is still accepting data. Data is put to Kinesis in the background.



88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/telekinesis/producer/async_producer.rb', line 88

def put_all(items)
  # NOTE: The lock ensures that no new data can be added to the queue after
  # the shutdown flag has been set. See the note in shutdown for details.
  @lock.read_lock do
    if @shutdown
      false
    else
      items.each do |key, data|
        @queue.put([key, data])
      end
      true
    end
  end
end

#queue_sizeObject

Return the number of events currently buffered by this producer. This doesn’t include any events buffered in workers that are currently on their way to Kinesis.



137
138
139
# File 'lib/telekinesis/producer/async_producer.rb', line 137

def queue_size
  @queue.size
end

#shutdown(block = false, duration = 2, unit = TimeUnit::SECONDS) ⇒ Object

Shut down this producer. After the call completes, the producer will not accept any more data, but will finish processing any data it has buffered internally.

If block = true is passed, this call will block and wait for the producer to shut down before returning. This wait times out after duration has passed.



110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/telekinesis/producer/async_producer.rb', line 110

def shutdown(block = false, duration = 2, unit = TimeUnit::SECONDS)
  # NOTE: Since a write_lock is exclusive, this prevents any data from being
  # added to the queue while the SHUTDOWN tokens are being inserted. Without
  # the lock, data can end up in the queue behind all of the shutdown tokens
  # and be lost. This happens if the shutdown flag is be flipped by a thread
  # calling shutdown after another thread has checked the "if @shutdown"
  # condition in put but before it's called queue.put.
  @lock.write_lock do
    @shutdown = true
    @workers.size.times do
      @queue.put(AsyncProducerWorker::SHUTDOWN)
    end
  end

  # Don't interrupt workers by calling shutdown_now.
  @worker_pool.shutdown
  await(duration, unit) if block
end