Class: AmplitudeAnalytics::Timeline

Inherits:
Object
  • Object
show all
Defined in:
lib/amplitude/timeline.rb

Overview

Timeline

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(configuration = nil) ⇒ Timeline



9
10
11
12
13
14
15
16
17
18
19
20
21
# File 'lib/amplitude/timeline.rb', line 9

def initialize(configuration = nil)
  @locks = {
    PluginType::BEFORE => Mutex.new,
    PluginType::ENRICHMENT => Mutex.new,
    PluginType::DESTINATION => Mutex.new
  }
  @plugins = {
    PluginType::BEFORE => [],
    PluginType::ENRICHMENT => [],
    PluginType::DESTINATION => []
  }
  @configuration = configuration
end

Instance Attribute Details

#configurationObject

Returns the value of attribute configuration.



6
7
8
# File 'lib/amplitude/timeline.rb', line 6

def configuration
  @configuration
end

#pluginsObject (readonly)

Returns the value of attribute plugins.



7
8
9
# File 'lib/amplitude/timeline.rb', line 7

def plugins
  @plugins
end

Instance Method Details

#add(plugin) ⇒ Object



32
33
34
35
36
# File 'lib/amplitude/timeline.rb', line 32

def add(plugin)
  @locks[plugin.plugin_type].synchronize do
    @plugins[plugin.plugin_type] << plugin
  end
end

#apply_plugins(plugin_type, event) ⇒ Object



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/amplitude/timeline.rb', line 70

def apply_plugins(plugin_type, event)
  result = event
  @locks[plugin_type].synchronize do
    @plugins[plugin_type].each do |plugin|
      break unless result

      begin
        if plugin.plugin_type == PluginType::DESTINATION
          plugin.execute(Marshal.load(Marshal.dump(result)))
        else
          result = plugin.execute(result)
        end
      rescue InvalidEventError
        logger.error("Invalid event body #{event}")
      rescue StandardError
        logger.error("Error for apply #{PluginType.name(plugin_type)} plugin for event #{event}")
      end
    end
  end
  result
end

#flushObject



46
47
48
49
50
51
52
53
54
55
56
# File 'lib/amplitude/timeline.rb', line 46

def flush
  destination_futures = []
  @locks[PluginType::DESTINATION].synchronize do
    @plugins[PluginType::DESTINATION].each do |destination|
      destination_futures << destination.flush
    rescue StandardError
      logger.exception('Error for flush events')
    end
  end
  destination_futures
end

#loggerObject



23
24
25
26
# File 'lib/amplitude/timeline.rb', line 23

def logger
  @configuration&.logger
  Logger.new($stdout, progname: LOGGER_NAME)
end

#process(event) ⇒ Object



58
59
60
61
62
63
64
65
66
67
68
# File 'lib/amplitude/timeline.rb', line 58

def process(event)
  if @configuration&.opt_out
    logger.info('Skipped event for opt out config')
    return event
  end

  before_result = apply_plugins(PluginType::BEFORE, event)
  enrich_result = apply_plugins(PluginType::ENRICHMENT, before_result)
  apply_plugins(PluginType::DESTINATION, enrich_result)
  enrich_result
end

#remove(plugin) ⇒ Object



38
39
40
41
42
43
44
# File 'lib/amplitude/timeline.rb', line 38

def remove(plugin)
  @locks.each_key do |plugin_type|
    @locks[plugin_type].synchronize do
      @plugins[plugin_type].reject! { |p| p == plugin }
    end
  end
end

#setup(client) ⇒ Object



28
29
30
# File 'lib/amplitude/timeline.rb', line 28

def setup(client)
  @configuration = client.configuration
end

#shutdownObject



92
93
94
95
96
# File 'lib/amplitude/timeline.rb', line 92

def shutdown
  @locks[PluginType::DESTINATION].synchronize do
    @plugins[PluginType::DESTINATION].each(&:shutdown)
  end
end