Class: SplitIoClient::Engine::Synchronizer

Inherits:
Object
  • Object
show all
Includes:
Cache::Fetchers, Cache::Senders
Defined in:
lib/splitclient-rb/engine/synchronizer.rb

Constant Summary collapse

ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES =
10

Instance Method Summary collapse

Constructor Details

#initialize(repositories, config, params) ⇒ Synchronizer

Returns a new instance of Synchronizer.



11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/splitclient-rb/engine/synchronizer.rb', line 11

def initialize(
  repositories,
  config,
  params
)
  @splits_repository = repositories[:splits]
  @segments_repository = repositories[:segments]
  @impressions_repository = repositories[:impressions]
  @events_repository = repositories[:events]
  @config = config
  @split_fetcher = params[:split_fetcher]
  @segment_fetcher = params[:segment_fetcher]
  @impressions_api = params[:impressions_api]
  @impression_counter = params[:imp_counter]
  @telemetry_synchronizer = params[:telemetry_synchronizer]
  @impressions_sender_adapter = params[:impressions_sender_adapter]
  @unique_keys_tracker = params[:unique_keys_tracker]

  @splits_sync_backoff = Engine::BackOff.new(10, 0, 60)
  @segments_sync_backoff = Engine::BackOff.new(10, 0, 60)
end

Instance Method Details

#fetch_segment(name, target_change_number) ⇒ Object



104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/splitclient-rb/engine/synchronizer.rb', line 104

def fetch_segment(name, target_change_number)
  return if target_change_number <= @segments_repository.get_change_number(name).to_i

  fetch_options = { cache_control_headers: true, till: nil }
  result = attempt_segment_sync(name,
                                target_change_number,
                                fetch_options,
                                @config.on_demand_fetch_max_retries,
                                @config.on_demand_fetch_retry_delay_seconds,
                                false)

  attempts = @config.on_demand_fetch_max_retries - result[:remaining_attempts]
  if result[:success]
    @config.logger.debug("Segment #{name} refresh completed in #{attempts} attempts.") if @config.debug_enabled

    return
  end

  fetch_options = { cache_control_headers: true, till: target_change_number }
  result = attempt_segment_sync(name,
                                target_change_number,
                                fetch_options,
                                ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES,
                                nil,
                                true)

  attempts = @config.on_demand_fetch_max_retries - result[:remaining_attempts]
  if result[:success]
    @config.logger.debug("Segment #{name} refresh completed bypassing the CDN in #{attempts} attempts.") if @config.debug_enabled
  else
    @config.logger.debug("No changes fetched for segment #{name} after #{attempts} attempts with CDN bypassed.") if @config.debug_enabled
  end
rescue StandardError => e
  @config.log_found_exception(__method__.to_s, e)
end

#fetch_splits(target_change_number) ⇒ Object



66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/splitclient-rb/engine/synchronizer.rb', line 66

def fetch_splits(target_change_number)
  return if target_change_number <= @splits_repository.get_change_number.to_i

  fetch_options = { cache_control_headers: true, till: nil }

  result = attempt_splits_sync(target_change_number,
                               fetch_options,
                               @config.on_demand_fetch_max_retries,
                               @config.on_demand_fetch_retry_delay_seconds,
                               false)

  attempts = @config.on_demand_fetch_max_retries - result[:remaining_attempts]
  if result[:success]
    @segment_fetcher.fetch_segments_if_not_exists(result[:segment_names], true) unless result[:segment_names].empty?
    @config.logger.debug("Refresh completed in #{attempts} attempts.") if @config.debug_enabled

    return
  end

  fetch_options[:till] = target_change_number
  result = attempt_splits_sync(target_change_number,
                               fetch_options,
                               ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES,
                               nil,
                               true)

  attempts = ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES - result[:remaining_attempts]

  if result[:success]
    @segment_fetcher.fetch_segments_if_not_exists(result[:segment_names], true) unless result[:segment_names].empty?
    @config.logger.debug("Refresh completed bypassing the CDN in #{attempts} attempts.") if @config.debug_enabled
  else
    @config.logger.debug("No changes fetched after #{attempts} attempts with CDN bypassed.") if @config.debug_enabled
  end
rescue StandardError => e
  @config.log_found_exception(__method__.to_s, e)
end

#start_periodic_data_recordingObject



45
46
47
48
49
50
51
52
53
54
# File 'lib/splitclient-rb/engine/synchronizer.rb', line 45

def start_periodic_data_recording
  unless @config.consumer?
    impressions_sender
    events_sender
    start_telemetry_sync_task
  end
  
  impressions_count_sender
  start_unique_keys_tracker_task
end

#start_periodic_fetchObject



56
57
58
59
# File 'lib/splitclient-rb/engine/synchronizer.rb', line 56

def start_periodic_fetch
  @split_fetcher.call
  @segment_fetcher.call
end

#stop_periodic_fetchObject



61
62
63
64
# File 'lib/splitclient-rb/engine/synchronizer.rb', line 61

def stop_periodic_fetch
  @split_fetcher.stop_splits_thread
  @segment_fetcher.stop_segments_thread
end

#sync_all(asynchronous = true) ⇒ Object



33
34
35
36
37
38
39
40
41
42
43
# File 'lib/splitclient-rb/engine/synchronizer.rb', line 33

def sync_all(asynchronous = true)
  unless asynchronous
    return sync_splits_and_segments
  end

  @config.threads[:sync_all_thread] = Thread.new do
    sync_splits_and_segments
  end

  true
end