Class: SnowplowTracker::AsyncEmitter

Inherits:
Emitter
  • Object
show all
Defined in:
lib/snowplow-tracker/emitters.rb

Overview

This Emitter subclass provides asynchronous event sending. Whenever the buffer is flushed, the AsyncEmitter places the flushed events in a work queue. The AsyncEmitter asynchronously sends events in this queue using a thread pool of a fixed size. The size of the thread pool is 1 by default, but can be configured as part of the options hash during initialization.

See Also:

Constant Summary

Constants inherited from Emitter

Emitter::DEFAULT_CONFIG

Instance Attribute Summary

Attributes inherited from Emitter

#logger

Instance Method Summary collapse

Methods inherited from Emitter

#confirm_buffer_size, #confirm_path, #good_status_code?, #input, #process_get_event, #send_requests, #send_requests_with_get, #send_requests_with_post

Constructor Details

#initialize(endpoint:, options: {}) ⇒ AsyncEmitter

Note:

if you test the AsyncEmitter by using a short script to send an event, you may find that the event fails to send. This is because the process exits before the flushing thread is finished. You can get round this either by adding a sleep(10) to the end of your script or by using the synchronous flush.

Create a new AsyncEmitter object. The endpoint is required.

The options hash can have any of these optional parameters:

| Parameter | Description | Type | | — | — | — | | path | Override the default path for appending to the endpoint | String | | protocol | ‘http’ or ‘https’ | String | | port | The port for the connection | Integer | | method | ‘get’ or ‘post’ | String | | buffer_size | Number of events to send at once | Integer | | on_success | A function to call if events were sent successfully | Function | | on_failure | A function to call if events did not send | Function | | thread_count | Number of threads to use | Integer | | logger | Log somewhere other than STDERR | Logger |

The ‘thread_count` determines the number of worker threads which will be used to send events.

If you choose to use HTTPS, we recommend using port 443.

Only 2xx and 3xx status codes are considered successes.

The ‘on_success` callback should accept one argument: the number of requests sent this way. The `on_failure` callback should accept two arguments: the number of successfully sent events, and an array containing the unsuccessful events.

Examples:

Initializing an AsyncEmitter with all the possible extra configuration.

success_callback = ->(success_count) { puts "#{success_count} events sent successfully" }
failure_callback = ->(success_count, failures) do
  puts "#{success_count} events sent successfully, #{failures.size} sent unsuccessfully"
end

SnowplowTracker::Emitter.new(endpoint: 'collector.example.com',
            options: { path: '/my-pipeline/1',
                       protocol: 'https',
                       port: 443,
                       method: 'post',
                       buffer_size: 5,
                       on_success: success_callback,
                       on_failure: failure_callback,
                       logger: Logger.new(STDOUT),
                       thread_count: 5 })

Parameters:

  • endpoint (String)

    the endpoint to send the events to

  • options (Hash) (defaults to: {})

    allowed configuration options

See Also:



411
412
413
414
415
416
417
418
419
# File 'lib/snowplow-tracker/emitters.rb', line 411

def initialize(endpoint:, options: {})
  @queue = Queue.new
  # @all_processed_condition and @results_unprocessed are used to emulate Python's Queue.task_done()
  @queue.extend(MonitorMixin)
  @all_processed_condition = @queue.new_cond
  @results_unprocessed = 0
  (options[:thread_count] || 1).times { Thread.new { consume } }
  super(endpoint: endpoint, options: options)
end

Instance Method Details

#consumeObject

AsyncEmitters use the MonitorMixin module, which provides the ‘synchronize` and `broadcast` methods.



424
425
426
427
428
429
430
431
432
433
# File 'lib/snowplow-tracker/emitters.rb', line 424

def consume
  loop do
    work_unit = @queue.pop
    send_requests(work_unit)
    @queue.synchronize do
      @results_unprocessed -= 1
      @all_processed_condition.broadcast
    end
  end
end

#flush(async = true) ⇒ Object

Flush the Emitter, forcing it to send all the events in its buffer, even if the buffer is not full.

If ‘async` is true (the default), events are sent even if the queue is not empty. If `async` is false, it blocks until all queued events have been sent. Note that this method can be called by public API method Tracker#flush, which has a default of `async` being false.

Parameters:

  • async (Bool) (defaults to: true)

    whether to flush asynchronously or not

See Also:



447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
# File 'lib/snowplow-tracker/emitters.rb', line 447

def flush(async = true)
  loop do
    @lock.synchronize do
      @queue.synchronize { @results_unprocessed += 1 }
      @queue << @buffer
      @buffer = []
    end
    unless async
      logger.info('Starting synchronous flush')
      @queue.synchronize do
        @all_processed_condition.wait_while { @results_unprocessed > 0 }
        logger.info('Finished synchronous flush')
      end
    end
    break if @buffer.empty?
  end
end