Class: Optimizely::BatchEventProcessor

Inherits:
EventProcessor show all
Defined in:
lib/optimizely/event/batch_event_processor.rb

Constant Summary collapse

DEFAULT_BATCH_SIZE =
10
DEFAULT_BATCH_INTERVAL =

interval in milliseconds

30_000
DEFAULT_QUEUE_CAPACITY =
1000
DEFAULT_TIMEOUT_INTERVAL =

interval in seconds

5
FLUSH_SIGNAL =
'FLUSH_SIGNAL'
SHUTDOWN_SIGNAL =
'SHUTDOWN_SIGNAL'

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(event_queue: SizedQueue.new(DEFAULT_QUEUE_CAPACITY), event_dispatcher: nil, batch_size: DEFAULT_BATCH_SIZE, flush_interval: DEFAULT_BATCH_INTERVAL, logger: NoOpLogger.new, notification_center: nil) ⇒ BatchEventProcessor

Returns a new instance of BatchEventProcessor.



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/optimizely/event/batch_event_processor.rb', line 38

def initialize(
  event_queue: SizedQueue.new(DEFAULT_QUEUE_CAPACITY),
  event_dispatcher: nil,
  batch_size: DEFAULT_BATCH_SIZE,
  flush_interval: DEFAULT_BATCH_INTERVAL,
  logger: NoOpLogger.new,
  notification_center: nil
)
  super()
  @event_queue = event_queue
  @logger = logger
  @event_dispatcher = event_dispatcher || EventDispatcher.new(logger: @logger)
  @batch_size = if (batch_size.is_a? Integer) && positive_number?(batch_size)
                  batch_size
                else
                  @logger.log(Logger::DEBUG, "Setting to default batch_size: #{DEFAULT_BATCH_SIZE}.")
                  DEFAULT_BATCH_SIZE
                end
  @flush_interval = if positive_number?(flush_interval)
                      flush_interval
                    else
                      @logger.log(Logger::DEBUG, "Setting to default flush_interval: #{DEFAULT_BATCH_INTERVAL} ms.")
                      DEFAULT_BATCH_INTERVAL
                    end
  @notification_center = notification_center
  @current_batch = []
  @started = false
  @stopped = false
end

Instance Attribute Details

#batch_sizeObject (readonly)

BatchEventProcessor is a batched implementation of the Interface EventProcessor. Events passed to the BatchEventProcessor are immediately added to an EventQueue. The BatchEventProcessor maintains a single consumer thread that pulls events off of the BlockingQueue and buffers them for either a configured batch size or for a maximum duration before the resulting LogEvent is sent to the NotificationCenter.



28
29
30
# File 'lib/optimizely/event/batch_event_processor.rb', line 28

def batch_size
  @batch_size
end

#current_batchObject (readonly)

BatchEventProcessor is a batched implementation of the Interface EventProcessor. Events passed to the BatchEventProcessor are immediately added to an EventQueue. The BatchEventProcessor maintains a single consumer thread that pulls events off of the BlockingQueue and buffers them for either a configured batch size or for a maximum duration before the resulting LogEvent is sent to the NotificationCenter.



28
29
30
# File 'lib/optimizely/event/batch_event_processor.rb', line 28

def current_batch
  @current_batch
end

#event_dispatcherObject (readonly)

BatchEventProcessor is a batched implementation of the Interface EventProcessor. Events passed to the BatchEventProcessor are immediately added to an EventQueue. The BatchEventProcessor maintains a single consumer thread that pulls events off of the BlockingQueue and buffers them for either a configured batch size or for a maximum duration before the resulting LogEvent is sent to the NotificationCenter.



28
29
30
# File 'lib/optimizely/event/batch_event_processor.rb', line 28

def event_dispatcher
  @event_dispatcher
end

#event_queueObject (readonly)

BatchEventProcessor is a batched implementation of the Interface EventProcessor. Events passed to the BatchEventProcessor are immediately added to an EventQueue. The BatchEventProcessor maintains a single consumer thread that pulls events off of the BlockingQueue and buffers them for either a configured batch size or for a maximum duration before the resulting LogEvent is sent to the NotificationCenter.



28
29
30
# File 'lib/optimizely/event/batch_event_processor.rb', line 28

def event_queue
  @event_queue
end

#flush_intervalObject (readonly)

BatchEventProcessor is a batched implementation of the Interface EventProcessor. Events passed to the BatchEventProcessor are immediately added to an EventQueue. The BatchEventProcessor maintains a single consumer thread that pulls events off of the BlockingQueue and buffers them for either a configured batch size or for a maximum duration before the resulting LogEvent is sent to the NotificationCenter.



28
29
30
# File 'lib/optimizely/event/batch_event_processor.rb', line 28

def flush_interval
  @flush_interval
end

#startedObject (readonly)

BatchEventProcessor is a batched implementation of the Interface EventProcessor. Events passed to the BatchEventProcessor are immediately added to an EventQueue. The BatchEventProcessor maintains a single consumer thread that pulls events off of the BlockingQueue and buffers them for either a configured batch size or for a maximum duration before the resulting LogEvent is sent to the NotificationCenter.



28
29
30
# File 'lib/optimizely/event/batch_event_processor.rb', line 28

def started
  @started
end

Instance Method Details

#flushObject



84
85
86
87
# File 'lib/optimizely/event/batch_event_processor.rb', line 84

def flush
  @event_queue << FLUSH_SIGNAL
  @wait_mutex.synchronize { @resource.signal }
end

#process(user_event) ⇒ Object



89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/optimizely/event/batch_event_processor.rb', line 89

def process(user_event)
  @logger.log(Logger::DEBUG, "Received userEvent: #{user_event}")

  # if the processor has been explicitly stopped. Don't accept tasks
  if @stopped
    @logger.log(Logger::WARN, 'Executor shutdown, not accepting tasks.')
    return
  end

  # start if the processor hasn't been started
  start! unless @started

  begin
    @event_queue.push(user_event, true)
    @wait_mutex.synchronize { @resource.signal }
  rescue => e
    @logger.log(Logger::WARN, "Payload not accepted by the queue: #{e.message}")
    nil
  end
end

#start!Object



68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/optimizely/event/batch_event_processor.rb', line 68

def start!
  if @started == true
    @logger.log(Logger::WARN, 'Service already started.')
    return
  end
  @flushing_interval_deadline = Helpers::DateTimeUtils.create_timestamp + @flush_interval
  @logger.log(Logger::INFO, 'Starting scheduler.')
  if @wait_mutex.nil?
    @wait_mutex = Mutex.new
    @resource = ConditionVariable.new
  end
  @thread = Thread.new { run_queue }
  @started = true
  @stopped = false
end

#stop!Object



110
111
112
113
114
115
116
117
118
119
# File 'lib/optimizely/event/batch_event_processor.rb', line 110

def stop!
  return unless @started

  @logger.log(Logger::INFO, 'Stopping scheduler.')
  @event_queue << SHUTDOWN_SIGNAL
  @wait_mutex.synchronize { @resource.signal }
  @thread.join(DEFAULT_TIMEOUT_INTERVAL)
  @started = false
  @stopped = true
end