Class: Optimizely::BatchEventProcessor
- Inherits:
-
EventProcessor
- Object
- EventProcessor
- Optimizely::BatchEventProcessor
- Defined in:
- lib/optimizely/event/batch_event_processor.rb
Constant Summary collapse
- DEFAULT_BATCH_SIZE =
10
- DEFAULT_BATCH_INTERVAL =
interval in milliseconds
30_000
- DEFAULT_QUEUE_CAPACITY =
1000
- DEFAULT_TIMEOUT_INTERVAL =
interval in seconds
5
- FLUSH_SIGNAL =
'FLUSH_SIGNAL'
- SHUTDOWN_SIGNAL =
'SHUTDOWN_SIGNAL'
Instance Attribute Summary collapse
-
#batch_size ⇒ Object
readonly
BatchEventProcessor is a batched implementation of the Interface EventProcessor.
-
#current_batch ⇒ Object
readonly
BatchEventProcessor is a batched implementation of the Interface EventProcessor.
-
#event_dispatcher ⇒ Object
readonly
BatchEventProcessor is a batched implementation of the Interface EventProcessor.
-
#event_queue ⇒ Object
readonly
BatchEventProcessor is a batched implementation of the Interface EventProcessor.
-
#flush_interval ⇒ Object
readonly
BatchEventProcessor is a batched implementation of the Interface EventProcessor.
-
#started ⇒ Object
readonly
BatchEventProcessor is a batched implementation of the Interface EventProcessor.
Instance Method Summary collapse
- #flush ⇒ Object
-
#initialize(event_queue: SizedQueue.new(DEFAULT_QUEUE_CAPACITY), event_dispatcher: nil, batch_size: DEFAULT_BATCH_SIZE, flush_interval: DEFAULT_BATCH_INTERVAL, logger: NoOpLogger.new, notification_center: nil) ⇒ BatchEventProcessor
constructor
A new instance of BatchEventProcessor.
- #process(user_event) ⇒ Object
- #start! ⇒ Object
- #stop! ⇒ Object
Constructor Details
#initialize(event_queue: SizedQueue.new(DEFAULT_QUEUE_CAPACITY), event_dispatcher: nil, batch_size: DEFAULT_BATCH_SIZE, flush_interval: DEFAULT_BATCH_INTERVAL, logger: NoOpLogger.new, notification_center: nil) ⇒ BatchEventProcessor
Returns a new instance of BatchEventProcessor.
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/optimizely/event/batch_event_processor.rb', line 38 def initialize( event_queue: SizedQueue.new(DEFAULT_QUEUE_CAPACITY), event_dispatcher: nil, batch_size: DEFAULT_BATCH_SIZE, flush_interval: DEFAULT_BATCH_INTERVAL, logger: NoOpLogger.new, notification_center: nil ) super() @event_queue = event_queue @logger = logger @event_dispatcher = event_dispatcher || EventDispatcher.new(logger: @logger) @batch_size = if (batch_size.is_a? Integer) && positive_number?(batch_size) batch_size else @logger.log(Logger::DEBUG, "Setting to default batch_size: #{DEFAULT_BATCH_SIZE}.") DEFAULT_BATCH_SIZE end @flush_interval = if positive_number?(flush_interval) flush_interval else @logger.log(Logger::DEBUG, "Setting to default flush_interval: #{DEFAULT_BATCH_INTERVAL} ms.") DEFAULT_BATCH_INTERVAL end @notification_center = notification_center @current_batch = [] @started = false @stopped = false end |
Instance Attribute Details
#batch_size ⇒ Object (readonly)
BatchEventProcessor is a batched implementation of the Interface EventProcessor. Events passed to the BatchEventProcessor are immediately added to an EventQueue. The BatchEventProcessor maintains a single consumer thread that pulls events off of the BlockingQueue and buffers them for either a configured batch size or for a maximum duration before the resulting LogEvent is sent to the NotificationCenter.
28 29 30 |
# File 'lib/optimizely/event/batch_event_processor.rb', line 28 def batch_size @batch_size end |
#current_batch ⇒ Object (readonly)
BatchEventProcessor is a batched implementation of the Interface EventProcessor. Events passed to the BatchEventProcessor are immediately added to an EventQueue. The BatchEventProcessor maintains a single consumer thread that pulls events off of the BlockingQueue and buffers them for either a configured batch size or for a maximum duration before the resulting LogEvent is sent to the NotificationCenter.
28 29 30 |
# File 'lib/optimizely/event/batch_event_processor.rb', line 28 def current_batch @current_batch end |
#event_dispatcher ⇒ Object (readonly)
BatchEventProcessor is a batched implementation of the Interface EventProcessor. Events passed to the BatchEventProcessor are immediately added to an EventQueue. The BatchEventProcessor maintains a single consumer thread that pulls events off of the BlockingQueue and buffers them for either a configured batch size or for a maximum duration before the resulting LogEvent is sent to the NotificationCenter.
28 29 30 |
# File 'lib/optimizely/event/batch_event_processor.rb', line 28 def event_dispatcher @event_dispatcher end |
#event_queue ⇒ Object (readonly)
BatchEventProcessor is a batched implementation of the Interface EventProcessor. Events passed to the BatchEventProcessor are immediately added to an EventQueue. The BatchEventProcessor maintains a single consumer thread that pulls events off of the BlockingQueue and buffers them for either a configured batch size or for a maximum duration before the resulting LogEvent is sent to the NotificationCenter.
28 29 30 |
# File 'lib/optimizely/event/batch_event_processor.rb', line 28 def event_queue @event_queue end |
#flush_interval ⇒ Object (readonly)
BatchEventProcessor is a batched implementation of the Interface EventProcessor. Events passed to the BatchEventProcessor are immediately added to an EventQueue. The BatchEventProcessor maintains a single consumer thread that pulls events off of the BlockingQueue and buffers them for either a configured batch size or for a maximum duration before the resulting LogEvent is sent to the NotificationCenter.
28 29 30 |
# File 'lib/optimizely/event/batch_event_processor.rb', line 28 def flush_interval @flush_interval end |
#started ⇒ Object (readonly)
BatchEventProcessor is a batched implementation of the Interface EventProcessor. Events passed to the BatchEventProcessor are immediately added to an EventQueue. The BatchEventProcessor maintains a single consumer thread that pulls events off of the BlockingQueue and buffers them for either a configured batch size or for a maximum duration before the resulting LogEvent is sent to the NotificationCenter.
28 29 30 |
# File 'lib/optimizely/event/batch_event_processor.rb', line 28 def started @started end |
Instance Method Details
#flush ⇒ Object
84 85 86 87 |
# File 'lib/optimizely/event/batch_event_processor.rb', line 84 def flush @event_queue << FLUSH_SIGNAL @wait_mutex.synchronize { @resource.signal } end |
#process(user_event) ⇒ Object
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 |
# File 'lib/optimizely/event/batch_event_processor.rb', line 89 def process(user_event) @logger.log(Logger::DEBUG, "Received userEvent: #{user_event}") # if the processor has been explicitly stopped. Don't accept tasks if @stopped @logger.log(Logger::WARN, 'Executor shutdown, not accepting tasks.') return end # start if the processor hasn't been started start! unless @started begin @event_queue.push(user_event, true) @wait_mutex.synchronize { @resource.signal } rescue => e @logger.log(Logger::WARN, "Payload not accepted by the queue: #{e.}") nil end end |
#start! ⇒ Object
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/optimizely/event/batch_event_processor.rb', line 68 def start! if @started == true @logger.log(Logger::WARN, 'Service already started.') return end @flushing_interval_deadline = Helpers::DateTimeUtils. + @flush_interval @logger.log(Logger::INFO, 'Starting scheduler.') if @wait_mutex.nil? @wait_mutex = Mutex.new @resource = ConditionVariable.new end @thread = Thread.new { run_queue } @started = true @stopped = false end |
#stop! ⇒ Object
110 111 112 113 114 115 116 117 118 119 |
# File 'lib/optimizely/event/batch_event_processor.rb', line 110 def stop! return unless @started @logger.log(Logger::INFO, 'Stopping scheduler.') @event_queue << SHUTDOWN_SIGNAL @wait_mutex.synchronize { @resource.signal } @thread.join(DEFAULT_TIMEOUT_INTERVAL) @started = false @stopped = true end |