Class: Rudder::Analytics::Worker

Inherits:
Object
  • Object
show all
Includes:
Defaults, Logging, Utils
Defined in:
lib/rudder/analytics/worker.rb

Constant Summary

Constants included from Utils

Utils::UTC_OFFSET_WITHOUT_COLON, Utils::UTC_OFFSET_WITH_COLON

Instance Method Summary collapse

Methods included from Logging

included, #logger

Methods included from Utils

#check_string, #date_in_iso8601, #datetime_in_iso8601, #formatted_offset, #isoify_dates, #isoify_dates!, #seconds_to_utc_offset, #stringify_keys, #symbolize_keys, #symbolize_keys!, #time_in_iso8601, #uid

Constructor Details

#initialize(queue, config) ⇒ Worker

public: Creates a new worker

The worker continuously takes messages off the queue and makes requests to the segment.io api

queue - Queue synchronized between client and worker write_key - String of the project’s Write key options - Hash of worker options

batch_size - Fixnum of how many items to send in a batch
on_error   - Proc of what to do on an error


26
27
28
29
30
31
32
33
34
35
36
# File 'lib/rudder/analytics/worker.rb', line 26

def initialize(queue, config)
  @queue = queue
  @data_plane_url = config.data_plane_url
  @write_key = config.write_key
  @ssl = config.ssl
  @on_error = config.on_error
  @on_error_with_messages = config.on_error_with_messages
  @batch = MessageBatch.new(config.batch_size)
  @lock = Mutex.new
  @transport = Transport.new(config)
end

Instance Method Details

#is_requesting?Boolean

public: Check whether we have outstanding requests.

Returns:

  • (Boolean)


63
64
65
# File 'lib/rudder/analytics/worker.rb', line 63

def is_requesting?
  @lock.synchronize { !@batch.empty? }
end

#runObject

public: Continuously runs the loop to check for new events



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/rudder/analytics/worker.rb', line 40

def run
  until Thread.current[:should_exit]
    return if @queue.empty?

    @lock.synchronize do
      consume_message_from_queue! until @batch.full? || @queue.empty?
    end

    # res = Request.new(:data_plane_url => @data_plane_url, :ssl => @ssl).post @write_key, @batch
    res = @transport.send @write_key, @batch
    unless res.status == 200
      @on_error.call(res.status, res.error) 
      @on_error_with_messages.call(res.status, res.error, @batch.messages)
    end

    @lock.synchronize { @batch.clear }
  end
ensure
  @transport.shutdown
end