Class: Rudder::Analytics::Worker
- Inherits:
-
Object
- Object
- Rudder::Analytics::Worker
- Defined in:
- lib/rudder/analytics/worker.rb
Constant Summary
Constants included from Utils
Utils::UTC_OFFSET_WITHOUT_COLON, Utils::UTC_OFFSET_WITH_COLON
Instance Method Summary collapse
-
#initialize(queue, config) ⇒ Worker
constructor
public: Creates a new worker.
-
#is_requesting? ⇒ Boolean
public: Check whether we have outstanding requests.
-
#run ⇒ Object
public: Continuously runs the loop to check for new events.
Methods included from Logging
Methods included from Utils
#check_string, #date_in_iso8601, #datetime_in_iso8601, #formatted_offset, #isoify_dates, #isoify_dates!, #seconds_to_utc_offset, #stringify_keys, #symbolize_keys, #symbolize_keys!, #time_in_iso8601, #uid
Constructor Details
#initialize(queue, config) ⇒ Worker
public: Creates a new worker
The worker continuously takes messages off the queue and makes requests to the segment.io api
queue - Queue synchronized between client and worker write_key - String of the project’s Write key options - Hash of worker options
batch_size - Fixnum of how many items to send in a batch
on_error - Proc of what to do on an error
26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/rudder/analytics/worker.rb', line 26 def initialize(queue, config) @queue = queue @data_plane_url = config.data_plane_url @write_key = config.write_key @ssl = config.ssl @on_error = config.on_error @on_error_with_messages = config. @batch = MessageBatch.new(config.batch_size) @lock = Mutex.new @transport = Transport.new(config) end |
Instance Method Details
#is_requesting? ⇒ Boolean
public: Check whether we have outstanding requests.
63 64 65 |
# File 'lib/rudder/analytics/worker.rb', line 63 def is_requesting? @lock.synchronize { !@batch.empty? } end |
#run ⇒ Object
public: Continuously runs the loop to check for new events
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/rudder/analytics/worker.rb', line 40 def run until Thread.current[:should_exit] return if @queue.empty? @lock.synchronize do until @batch.full? || @queue.empty? end # res = Request.new(:data_plane_url => @data_plane_url, :ssl => @ssl).post @write_key, @batch res = @transport.send @write_key, @batch unless res.status == 200 @on_error.call(res.status, res.error) @on_error_with_messages.call(res.status, res.error, @batch.) end @lock.synchronize { @batch.clear } end ensure @transport.shutdown end |