Class: RSpec::Buildkite::Analytics::Session

Inherits:
Object
  • Object
show all
Defined in:
lib/rspec/buildkite/analytics/session.rb

Defined Under Namespace

Classes: InitialConnectionFailure, Logger, RejectedSubscription

Constant Summary collapse

CONFIRMATION_TIMEOUT =

Picked 75 as the magic timeout number as it’s longer than the TCP timeout of 60s 🤷‍♀️

ENV.fetch("BUILDKITE_ANALYTICS_CONFIRMATION_TIMEOUT") { 75 }.to_i
MAX_RECONNECTION_ATTEMPTS =
ENV.fetch("BUILDKITE_ANALYTICS_RECONNECTION_ATTEMPTS") { 3 }.to_i
WAIT_BETWEEN_RECONNECTIONS =
ENV.fetch("BUILDKITE_ANALYTICS_RECONNECTION_WAIT") { 5 }.to_i

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(url, authorization_header, channel) ⇒ Session

Returns a new instance of Session.



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/rspec/buildkite/analytics/session.rb', line 37

def initialize(url, authorization_header, channel)
  @establish_subscription_queue = Queue.new
  @channel = channel

  @unconfirmed_idents = {}
  @idents_mutex = Mutex.new
  @send_queue = Queue.new
  @empty = ConditionVariable.new
  @closing = false
  @eot_queued = false
  @eot_queued_mutex = Mutex.new
  @reconnection_mutex = Mutex.new

  @url = url
  @authorization_header = authorization_header

  @logger = Logger.new

  reconnection_count = 0

  begin
    reconnection_count += 1
    connect
  rescue TimeoutError, InitialConnectionFailure => e
    @logger.write("rspec-buildkite-analytics could not establish an initial connection with Buildkite due to #{e}. Attempting retry #{reconnection_count} of #{MAX_RECONNECTION_ATTEMPTS}...")
    if reconnection_count > MAX_RECONNECTION_ATTEMPTS
      $stderr.puts "rspec-buildkite-analytics could not establish an initial connection with Buildkite due to #{e.message} after #{MAX_RECONNECTION_ATTEMPTS} attempts. You may be missing some data for this test suite, please contact support if this issue persists."
    else
      sleep(WAIT_BETWEEN_RECONNECTIONS)
      @logger.write("retrying reconnection")
      retry
    end
  end
  init_write_thread
end

Instance Attribute Details

#loggerObject (readonly)

Returns the value of attribute logger.



35
36
37
# File 'lib/rspec/buildkite/analytics/session.rb', line 35

def logger
  @logger
end

Instance Method Details

#close(examples_count) ⇒ Object



108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/rspec/buildkite/analytics/session.rb', line 108

def close(examples_count)
  @closing = true
  @examples_count = examples_count
  @logger.write("closing socket connection")

  # Because the server only sends us confirmations after every 10mb of
  # data it uploads to S3, we'll never get confirmation of the
  # identifiers of the last upload part unless we send an explicit finish,
  # to which the server will respond with the last bits of data
  send_eot

  # After EOT, we wait for 75 seconds for the send queue to be drained and for the
  # server to confirm the last idents. If everything has already been confirmed we can
  # proceed without waiting.
  @idents_mutex.synchronize do
    if @unconfirmed_idents.any?
      puts "Waiting for Buildkite Test Analytics to send results..."
      @logger.write("waiting for last confirm")

      @empty.wait(@idents_mutex, CONFIRMATION_TIMEOUT)
    end
  end

  # Then we always disconnect cos we can't wait forever? 🤷‍♀️
  @connection.close
  # We kill the write thread cos it's got a while loop in it, so it won't finish otherwise
  @write_thread&.kill

  puts "Buildkite Test Analytics completed"
  @logger.write("socket connection closed")
end

#disconnected(connection) ⇒ Object



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/rspec/buildkite/analytics/session.rb', line 73

def disconnected(connection)
  @reconnection_mutex.synchronize do
    # When the first thread detects a disconnection, it calls the disconnect method
    # with the current connection. This thread grabs the reconnection mutex and does the
    # reconnection, which then updates the value of @connection.
    #
    # At some point in that process, the second thread would have detected the
    # disconnection too, and it also calls it with the current connection. However, the
    # second thread can't run the reconnection code because of the mutex. By the
    # time the mutex is released, the value of @connection has been refreshed, and so
    # the second thread returns early and does not reattempt the reconnection.
    return unless connection == @connection
    @logger.write("starting reconnection")

    reconnection_count = 0

    begin
      reconnection_count += 1
      connect
      init_write_thread
    rescue SocketConnection::HandshakeError, RejectedSubscription, TimeoutError, InitialConnectionFailure, SocketConnection::SocketError => e
      @logger.write("failed reconnection attempt #{reconnection_count} due to #{e}")
      if reconnection_count > MAX_RECONNECTION_ATTEMPTS
        $stderr.puts "rspec-buildkite-analytics experienced a disconnection and could not reconnect to Buildkite due to #{e.message}. Please contact support."
        raise e
      else
        sleep(WAIT_BETWEEN_RECONNECTIONS)
        @logger.write("retrying reconnection")
        retry
      end
    end
  end
  retransmit
end

#handle(_connection, data) ⇒ Object



140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# File 'lib/rspec/buildkite/analytics/session.rb', line 140

def handle(_connection, data)
  data = JSON.parse(data)
  case data["type"]
  when "ping"
    # In absence of other message, the server sends us a ping every 3 seconds
    # We are currently not doing anything with these
    @logger.write("received ping")
  when "welcome", "confirm_subscription"
    # Push these two messages onto the queue, so that we block on waiting for the
    # initializing phase to complete
    @establish_subscription_queue.push(data)
  @logger.write("received #{data['type']}")
  when "reject_subscription"
    @logger.write("received rejected_subscription")
    raise RejectedSubscription
  else
    process_message(data)
  end
end

#unconfirmed_idents_countObject



166
167
168
169
170
# File 'lib/rspec/buildkite/analytics/session.rb', line 166

def unconfirmed_idents_count
  @idents_mutex.synchronize do
    @unconfirmed_idents.count
  end
end

#write_result(result) ⇒ Object



160
161
162
163
164
# File 'lib/rspec/buildkite/analytics/session.rb', line 160

def write_result(result)
  queue_and_track_result(result.id, result.as_hash)

  @logger.write("added #{result.id} to send queue")
end