Class: Optimizely::OdpEventManager

Inherits:
Object
  • Object
show all
Defined in:
lib/optimizely/odp/odp_event_manager.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(api_manager: nil, logger: NoOpLogger.new, proxy_config: nil, request_timeout: nil, flush_interval: nil) ⇒ OdpEventManager

Returns a new instance of OdpEventManager.



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/optimizely/odp/odp_event_manager.rb', line 32

def initialize(
  api_manager: nil,
  logger: NoOpLogger.new,
  proxy_config: nil,
  request_timeout: nil,
  flush_interval: nil
)
  @odp_config = nil
  @api_host = nil
  @api_key = nil

  @mutex = Mutex.new
  @event_queue = SizedQueue.new(Optimizely::Helpers::Constants::ODP_EVENT_MANAGER[:DEFAULT_QUEUE_CAPACITY])
  @queue_capacity = Helpers::Constants::ODP_EVENT_MANAGER[:DEFAULT_QUEUE_CAPACITY]
  # received signal should be sent after adding item to event_queue
  @received = ConditionVariable.new
  @logger = logger
  @api_manager = api_manager || OdpEventApiManager.new(logger: @logger, proxy_config: proxy_config, timeout: request_timeout)
  @flush_interval = flush_interval || Helpers::Constants::ODP_EVENT_MANAGER[:DEFAULT_FLUSH_INTERVAL_SECONDS]
  @batch_size = @flush_interval&.zero? ? 1 : Helpers::Constants::ODP_EVENT_MANAGER[:DEFAULT_BATCH_SIZE]
  @flush_deadline = 0
  @retry_count = Helpers::Constants::ODP_EVENT_MANAGER[:DEFAULT_RETRY_COUNT]
  # current_batch should only be accessed by processing thread
  @current_batch = []
  @thread = nil
  @thread_exception = false
end

Instance Attribute Details

#api_managerObject (readonly)

Events passed to the OdpEventManager are immediately added to an EventQueue. The OdpEventManager maintains a single consumer thread that pulls events off of the BlockingQueue and buffers them for either a configured batch size or for a maximum duration before the resulting OdpEvent is sent to Odp.



29
30
31
# File 'lib/optimizely/odp/odp_event_manager.rb', line 29

def api_manager
  @api_manager
end

#batch_sizeObject (readonly)

Events passed to the OdpEventManager are immediately added to an EventQueue. The OdpEventManager maintains a single consumer thread that pulls events off of the BlockingQueue and buffers them for either a configured batch size or for a maximum duration before the resulting OdpEvent is sent to Odp.



29
30
31
# File 'lib/optimizely/odp/odp_event_manager.rb', line 29

def batch_size
  @batch_size
end

#loggerObject (readonly)

Events passed to the OdpEventManager are immediately added to an EventQueue. The OdpEventManager maintains a single consumer thread that pulls events off of the BlockingQueue and buffers them for either a configured batch size or for a maximum duration before the resulting OdpEvent is sent to Odp.



29
30
31
# File 'lib/optimizely/odp/odp_event_manager.rb', line 29

def logger
  @logger
end

#odp_configObject

Returns the value of attribute odp_config.



30
31
32
# File 'lib/optimizely/odp/odp_event_manager.rb', line 30

def odp_config
  @odp_config
end

Instance Method Details

#dispatch(event) ⇒ Object



100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# File 'lib/optimizely/odp/odp_event_manager.rb', line 100

def dispatch(event)
  if @thread_exception
    @logger.log(Logger::ERROR, format(Helpers::Constants::ODP_LOGS[:ODP_EVENT_FAILED], 'Queue is down'))
    return
  end

  # if the processor has been explicitly stopped. Don't accept tasks
  unless running?
    @logger.log(Logger::WARN, 'ODP event queue is shutdown, not accepting events.')
    return
  end

  begin
    @logger.log(Logger::DEBUG, 'ODP event queue: adding event.')
    @event_queue.push(event, true)
  rescue => e
    @logger.log(Logger::WARN, format(Helpers::Constants::ODP_LOGS[:ODP_EVENT_FAILED], e.message))
    return
  end

  @mutex.synchronize do
    @received.signal
  end
end

#flushObject



74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/optimizely/odp/odp_event_manager.rb', line 74

def flush
  begin
    @event_queue.push(:FLUSH_SIGNAL, true)
  rescue ThreadError
    @logger.log(Logger::ERROR, 'Error flushing ODP event queue.')
    return
  end

  @mutex.synchronize do
    @received.signal
  end
end

#running?Boolean

Returns:

  • (Boolean)


165
166
167
# File 'lib/optimizely/odp/odp_event_manager.rb', line 165

def running?
  !!@thread && !!@thread.status && !@event_queue.closed?
end

#send_event(type:, action:, identifiers:, data:) ⇒ Object



125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# File 'lib/optimizely/odp/odp_event_manager.rb', line 125

def send_event(type:, action:, identifiers:, data:)
  case @odp_config&.odp_state
  when nil
    @logger.log(Logger::DEBUG, 'ODP event queue: cannot send before config has been set.')
    return
  when OdpConfig::ODP_CONFIG_STATE[:UNDETERMINED]
    @logger.log(Logger::DEBUG, 'ODP event queue: cannot send before the datafile has loaded.')
    return
  when OdpConfig::ODP_CONFIG_STATE[:NOT_INTEGRATED]
    @logger.log(Logger::DEBUG, Helpers::Constants::ODP_LOGS[:ODP_NOT_INTEGRATED])
    return
  end

  event = Optimizely::OdpEvent.new(type: type, action: action, identifiers: identifiers, data: data)
  dispatch(event)
end

#start!(odp_config) ⇒ Object



60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/optimizely/odp/odp_event_manager.rb', line 60

def start!(odp_config)
  if running?
    @logger.log(Logger::WARN, 'Service already started.')
    return
  end

  @odp_config = odp_config
  @api_host = odp_config.api_host
  @api_key = odp_config.api_key

  @thread = Thread.new { run }
  @logger.log(Logger::INFO, 'Starting scheduler.')
end

#stop!Object



142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
# File 'lib/optimizely/odp/odp_event_manager.rb', line 142

def stop!
  return unless running?

  begin
    @event_queue.push(:SHUTDOWN_SIGNAL, true)
  rescue ThreadError
    @logger.log(Logger::ERROR, 'Error stopping ODP event queue.')
    return
  end

  @event_queue.close

  @mutex.synchronize do
    @received.signal
  end

  @logger.log(Logger::INFO, 'Stopping ODP event queue.')

  @thread.join

  @logger.log(Logger::ERROR, format(Helpers::Constants::ODP_LOGS[:ODP_EVENT_FAILED], @current_batch.to_json)) unless @current_batch.empty?
end

#update_configObject



87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/optimizely/odp/odp_event_manager.rb', line 87

def update_config
  begin
    # Adds update config signal to event_queue.
    @event_queue.push(:UPDATE_CONFIG, true)
  rescue ThreadError
    @logger.log(Logger::ERROR, 'Error updating ODP config for the event queue')
  end

  @mutex.synchronize do
    @received.signal
  end
end