Class: SnowplowTracker::AsyncEmitter
- Defined in:
- lib/snowplow-tracker/emitters.rb
Overview
This Emitter subclass provides asynchronous event sending. Whenever the buffer is flushed, the AsyncEmitter places the flushed events in a work queue. The AsyncEmitter asynchronously sends events in this queue using a thread pool of a fixed size. The size of the thread pool is 1 by default, but can be configured as part of the options hash during initialization.
Constant Summary
Constants inherited from Emitter
Instance Attribute Summary
Attributes inherited from Emitter
Instance Method Summary collapse
-
#consume ⇒ Object
AsyncEmitters use the MonitorMixin module, which provides the ‘synchronize` and `broadcast` methods.
-
#flush(async = true) ⇒ Object
Flush the Emitter, forcing it to send all the events in its buffer, even if the buffer is not full.
-
#initialize(endpoint:, options: {}) ⇒ AsyncEmitter
constructor
Create a new AsyncEmitter object.
Methods inherited from Emitter
#confirm_buffer_size, #confirm_path, #good_status_code?, #input, #process_get_event, #send_requests, #send_requests_with_get, #send_requests_with_post
Constructor Details
#initialize(endpoint:, options: {}) ⇒ AsyncEmitter
if you test the AsyncEmitter by using a short script to send an event, you may find that the event fails to send. This is because the process exits before the flushing thread is finished. You can get round this either by adding a sleep(10) to the end of your script or by using the synchronous flush.
Create a new AsyncEmitter object. The endpoint is required.
The options hash can have any of these optional parameters:
| Parameter | Description | Type | | — | — | — | | path | Override the default path for appending to the endpoint | String | | protocol | ‘http’ or ‘https’ | String | | port | The port for the connection | Integer | | method | ‘get’ or ‘post’ | String | | buffer_size | Number of events to send at once | Integer | | on_success | A function to call if events were sent successfully | Function | | on_failure | A function to call if events did not send | Function | | thread_count | Number of threads to use | Integer | | logger | Log somewhere other than STDERR | Logger |
The ‘thread_count` determines the number of worker threads which will be used to send events.
If you choose to use HTTPS, we recommend using port 443.
Only 2xx and 3xx status codes are considered successes.
The ‘on_success` callback should accept one argument: the number of requests sent this way. The `on_failure` callback should accept two arguments: the number of successfully sent events, and an array containing the unsuccessful events.
411 412 413 414 415 416 417 418 419 |
# File 'lib/snowplow-tracker/emitters.rb', line 411 def initialize(endpoint:, options: {}) @queue = Queue.new # @all_processed_condition and @results_unprocessed are used to emulate Python's Queue.task_done() @queue.extend(MonitorMixin) @all_processed_condition = @queue.new_cond @results_unprocessed = 0 ([:thread_count] || 1).times { Thread.new { consume } } super(endpoint: endpoint, options: ) end |
Instance Method Details
#consume ⇒ Object
AsyncEmitters use the MonitorMixin module, which provides the ‘synchronize` and `broadcast` methods.
424 425 426 427 428 429 430 431 432 433 |
# File 'lib/snowplow-tracker/emitters.rb', line 424 def consume loop do work_unit = @queue.pop send_requests(work_unit) @queue.synchronize do @results_unprocessed -= 1 @all_processed_condition.broadcast end end end |
#flush(async = true) ⇒ Object
Flush the Emitter, forcing it to send all the events in its buffer, even if the buffer is not full.
If ‘async` is true (the default), events are sent even if the queue is not empty. If `async` is false, it blocks until all queued events have been sent. Note that this method can be called by public API method Tracker#flush, which has a default of `async` being false.
447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 |
# File 'lib/snowplow-tracker/emitters.rb', line 447 def flush(async = true) loop do @lock.synchronize do @queue.synchronize { @results_unprocessed += 1 } @queue << @buffer @buffer = [] end unless async logger.info('Starting synchronous flush') @queue.synchronize do @all_processed_condition.wait_while { @results_unprocessed > 0 } logger.info('Finished synchronous flush') end end break if @buffer.empty? end end |