41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
|
# File 'lib/ldclient-rb/stream.rb', line 41
def start
return @ready unless @started.make_true
@config.logger.info { "[LDClient] Initializing stream connection" }
= Impl::Util.(@sdk_key, @config)
opts = {
headers: ,
read_timeout: READ_TIMEOUT_SECONDS,
logger: @config.logger,
socket_factory: @config.socket_factory,
reconnect_time: @config.initial_reconnect_delay,
}
log_connection_started
uri = Util.add_payload_filter_key(@config.stream_uri + "/all", @config)
@es = SSE::Client.new(uri, **opts) do |conn|
conn.on_event { |event| process_message(event) }
conn.on_error { |err|
log_connection_result(false)
case err
when SSE::Errors::HTTPStatusError
status = err.status
error_info = LaunchDarkly::Interfaces::DataSource::ErrorInfo.new(
LaunchDarkly::Interfaces::DataSource::ErrorInfo::ERROR_RESPONSE, status, nil, Time.now)
message = Util.http_error_message(status, "streaming connection", "will retry")
@config.logger.error { "[LDClient] #{message}" }
if Util.http_error_recoverable?(status)
@data_source_update_sink&.update_status(
LaunchDarkly::Interfaces::DataSource::Status::INTERRUPTED,
error_info
)
else
@ready.set stop_with_error_info error_info
end
when SSE::Errors::HTTPContentTypeError, SSE::Errors::HTTPProxyError, SSE::Errors::ReadTimeoutError
@data_source_update_sink&.update_status(
LaunchDarkly::Interfaces::DataSource::Status::INTERRUPTED,
LaunchDarkly::Interfaces::DataSource::ErrorInfo.new(LaunchDarkly::Interfaces::DataSource::ErrorInfo::NETWORK_ERROR, 0, err.to_s, Time.now)
)
else
@data_source_update_sink&.update_status(
LaunchDarkly::Interfaces::DataSource::Status::INTERRUPTED,
LaunchDarkly::Interfaces::DataSource::ErrorInfo.new(LaunchDarkly::Interfaces::DataSource::ErrorInfo::UNKNOWN, 0, err.to_s, Time.now)
)
end
}
end
@ready
end
|