Class: AmplitudeAnalytics::Timeline
- Inherits:
-
Object
- Object
- AmplitudeAnalytics::Timeline
- Defined in:
- lib/amplitude/timeline.rb
Overview
Timeline
Instance Attribute Summary collapse
-
#configuration ⇒ Object
Returns the value of attribute configuration.
-
#plugins ⇒ Object
readonly
Returns the value of attribute plugins.
Instance Method Summary collapse
- #add(plugin) ⇒ Object
- #apply_plugins(plugin_type, event) ⇒ Object
- #flush ⇒ Object
-
#initialize(configuration = nil) ⇒ Timeline
constructor
A new instance of Timeline.
- #logger ⇒ Object
- #process(event) ⇒ Object
- #remove(plugin) ⇒ Object
- #setup(client) ⇒ Object
- #shutdown ⇒ Object
Constructor Details
#initialize(configuration = nil) ⇒ Timeline
9 10 11 12 13 14 15 16 17 18 19 20 21 |
# File 'lib/amplitude/timeline.rb', line 9 def initialize(configuration = nil) @locks = { PluginType::BEFORE => Mutex.new, PluginType::ENRICHMENT => Mutex.new, PluginType::DESTINATION => Mutex.new } @plugins = { PluginType::BEFORE => [], PluginType::ENRICHMENT => [], PluginType::DESTINATION => [] } @configuration = configuration end |
Instance Attribute Details
#configuration ⇒ Object
Returns the value of attribute configuration.
6 7 8 |
# File 'lib/amplitude/timeline.rb', line 6 def configuration @configuration end |
#plugins ⇒ Object (readonly)
Returns the value of attribute plugins.
7 8 9 |
# File 'lib/amplitude/timeline.rb', line 7 def plugins @plugins end |
Instance Method Details
#add(plugin) ⇒ Object
32 33 34 35 36 |
# File 'lib/amplitude/timeline.rb', line 32 def add(plugin) @locks[plugin.plugin_type].synchronize do @plugins[plugin.plugin_type] << plugin end end |
#apply_plugins(plugin_type, event) ⇒ Object
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
# File 'lib/amplitude/timeline.rb', line 70 def apply_plugins(plugin_type, event) result = event @locks[plugin_type].synchronize do @plugins[plugin_type].each do |plugin| break unless result begin if plugin.plugin_type == PluginType::DESTINATION plugin.execute(Marshal.load(Marshal.dump(result))) else result = plugin.execute(result) end rescue InvalidEventError logger.error("Invalid event body #{event}") rescue StandardError logger.error("Error for apply #{PluginType.name(plugin_type)} plugin for event #{event}") end end end result end |
#flush ⇒ Object
46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/amplitude/timeline.rb', line 46 def flush destination_futures = [] @locks[PluginType::DESTINATION].synchronize do @plugins[PluginType::DESTINATION].each do |destination| destination_futures << destination.flush rescue StandardError logger.exception('Error for flush events') end end destination_futures end |
#logger ⇒ Object
23 24 25 26 |
# File 'lib/amplitude/timeline.rb', line 23 def logger @configuration&.logger Logger.new($stdout, progname: LOGGER_NAME) end |
#process(event) ⇒ Object
58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/amplitude/timeline.rb', line 58 def process(event) if @configuration&.opt_out logger.info('Skipped event for opt out config') return event end before_result = apply_plugins(PluginType::BEFORE, event) enrich_result = apply_plugins(PluginType::ENRICHMENT, before_result) apply_plugins(PluginType::DESTINATION, enrich_result) enrich_result end |
#remove(plugin) ⇒ Object
38 39 40 41 42 43 44 |
# File 'lib/amplitude/timeline.rb', line 38 def remove(plugin) @locks.each_key do |plugin_type| @locks[plugin_type].synchronize do @plugins[plugin_type].reject! { |p| p == plugin } end end end |
#setup(client) ⇒ Object
28 29 30 |
# File 'lib/amplitude/timeline.rb', line 28 def setup(client) @configuration = client.configuration end |
#shutdown ⇒ Object
92 93 94 95 96 |
# File 'lib/amplitude/timeline.rb', line 92 def shutdown @locks[PluginType::DESTINATION].synchronize do @plugins[PluginType::DESTINATION].each(&:shutdown) end end |