Class: Peatio::Ranger::Router
- Inherits:
-
Object
- Object
- Peatio::Ranger::Router
- Defined in:
- lib/peatio/ranger/router.rb
Defined Under Namespace
Classes: ConnectionArray
Instance Attribute Summary collapse
-
#connections ⇒ Object
readonly
Returns the value of attribute connections.
-
#connections_by_userid ⇒ Object
readonly
Returns the value of attribute connections_by_userid.
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
-
#streams_sockets ⇒ Object
readonly
Returns the value of attribute streams_sockets.
Instance Method Summary collapse
- #compute_connections_all ⇒ Object
- #compute_connections_private ⇒ Object
- #compute_stream_subscriptions(stream) ⇒ Object
- #compute_streams_kinds ⇒ Object
- #compute_streams_subscriptions ⇒ Object
- #debug ⇒ Object
- #increment?(stream) ⇒ Boolean
- #init_metrics(prometheus) ⇒ Object
-
#initialize(prometheus = nil) ⇒ Router
constructor
A new instance of Router.
- #on_connection_close(connection) ⇒ Object
- #on_connection_open(connection) ⇒ Object
-
#on_message(delivery_info, _metadata, payload) ⇒ Object
routing key format: type.id.event * ‘type` can be public or private * `id` can be user id or market id * `event` is the event identifier, ex: order_completed, trade, …
- #on_subscribe(connection, stream) ⇒ Object
- #on_unsubscribe(connection, stream) ⇒ Object
- #sanity_check_metrics_connections ⇒ Object
- #send_private_message(user_id, event, payload_decoded) ⇒ Object
- #send_public_message(stream, raw_message) ⇒ Object
- #send_snapshot_and_increments(connection, key) ⇒ Object
- #snapshot?(stream) ⇒ Boolean
- #stats ⇒ Object
- #storekey(stream) ⇒ Object
Constructor Details
#initialize(prometheus = nil) ⇒ Router
Returns a new instance of Router.
18 19 20 21 22 23 24 25 |
# File 'lib/peatio/ranger/router.rb', line 18 def initialize(prometheus=nil) @connections = {} @connections_by_userid = {} @streams_sockets = {} @logger = Peatio::Logger.logger @stores = {} init_metrics(prometheus) end |
Instance Attribute Details
#connections ⇒ Object (readonly)
Returns the value of attribute connections.
5 6 7 |
# File 'lib/peatio/ranger/router.rb', line 5 def connections @connections end |
#connections_by_userid ⇒ Object (readonly)
Returns the value of attribute connections_by_userid.
6 7 8 |
# File 'lib/peatio/ranger/router.rb', line 6 def connections_by_userid @connections_by_userid end |
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
8 9 10 |
# File 'lib/peatio/ranger/router.rb', line 8 def logger @logger end |
#streams_sockets ⇒ Object (readonly)
Returns the value of attribute streams_sockets.
7 8 9 |
# File 'lib/peatio/ranger/router.rb', line 7 def streams_sockets @streams_sockets end |
Instance Method Details
#compute_connections_all ⇒ Object
81 82 83 |
# File 'lib/peatio/ranger/router.rb', line 81 def compute_connections_all @connections.size end |
#compute_connections_private ⇒ Object
85 86 87 |
# File 'lib/peatio/ranger/router.rb', line 85 def compute_connections_private @connections_by_userid.each_value.map(&:size).reduce(0, :+) end |
#compute_stream_subscriptions(stream) ⇒ Object
89 90 91 |
# File 'lib/peatio/ranger/router.rb', line 89 def compute_stream_subscriptions(stream) @streams_sockets[stream]&.size || 0 end |
#compute_streams_kinds ⇒ Object
97 98 99 |
# File 'lib/peatio/ranger/router.rb', line 97 def compute_streams_kinds @streams_sockets.size end |
#compute_streams_subscriptions ⇒ Object
93 94 95 |
# File 'lib/peatio/ranger/router.rb', line 93 def compute_streams_subscriptions @streams_sockets.each_value.map(&:size).reduce(0, :+) end |
#debug ⇒ Object
72 73 74 75 76 77 78 79 |
# File 'lib/peatio/ranger/router.rb', line 72 def debug [ "==== Debug ====", "connections: %s" % [@connections.inspect], "connections_by_userid: %s" % [@connections_by_userid], "streams_sockets: %s" % [@streams_sockets], ].join("\n") end |
#increment?(stream) ⇒ Boolean
52 53 54 |
# File 'lib/peatio/ranger/router.rb', line 52 def increment?(stream) stream.end_with?("-inc") end |
#init_metrics(prometheus) ⇒ Object
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/peatio/ranger/router.rb', line 27 def init_metrics(prometheus) return unless prometheus @prometheus = prometheus @metric_connections_total = @prometheus.counter( :ranger_connections_total, docstring: "Total number of connections to ranger from the start", labels: [:auth] ) @metric_connections_current = @prometheus.gauge( :ranger_connections_current, docstring: "Current number of connections to ranger", labels: [:auth] ) @metric_subscriptions_current = @prometheus.gauge( :ranger_subscriptions_current, docstring: "Current number of streams subscriptions to ranger", labels: [:stream] ) end |
#on_connection_close(connection) ⇒ Object
127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 |
# File 'lib/peatio/ranger/router.rb', line 127 def on_connection_close(connection) @connections.delete(connection.id) connection.streams.keys.each do |stream| on_unsubscribe(connection, stream) end unless connection. @metric_connections_current&.decrement(labels: {auth: "public"}) sanity_check_metrics_connections return end @metric_connections_current&.decrement(labels: {auth: "private"}) @connections_by_userid[connection.user].delete(connection) @connections_by_userid.delete(connection.user) \ if @connections_by_userid[connection.user].empty? sanity_check_metrics_connections end |
#on_connection_open(connection) ⇒ Object
113 114 115 116 117 118 119 120 121 122 123 124 125 |
# File 'lib/peatio/ranger/router.rb', line 113 def on_connection_open(connection) @connections[connection.id] = connection unless connection. @metric_connections_current&.increment(labels: {auth: "public"}) @metric_connections_total&.increment(labels: {auth: "public"}) return end @metric_connections_current&.increment(labels: {auth: "private"}) @metric_connections_total&.increment(labels: {auth: "private"}) @connections_by_userid[connection.user] ||= ConnectionArray.new @connections_by_userid[connection.user] << connection end |
#on_message(delivery_info, _metadata, payload) ⇒ Object
routing key format: type.id.event
-
‘type` can be public or private
-
‘id` can be user id or market id
-
‘event` is the event identifier, ex: order_completed, trade, …
187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 |
# File 'lib/peatio/ranger/router.rb', line 187 def (delivery_info, , payload) routing_key = delivery_info.routing_key if routing_key.count(".") != 2 logger.error { "invalid routing key from amqp: #{routing_key}" } return end type, id, event = routing_key.split(".") payload_decoded = JSON.parse(payload) if type == "private" (id, event, payload_decoded) return end stream = [id, event].join(".") = JSON.dump(stream => payload_decoded) if snapshot?(event) key = storekey(stream) unless @stores[key] # Send the snapshot to subscribers of -inc stream if there were no snapshot before ("#{key}-inc", ) end @stores[key] = { snapshot: , increments: [], } return end if increment?(event) key = storekey(stream) unless @stores[key] logger.warn { "Discard increment received before snapshot for store:#{key}" } return end @stores[key][:increments] << end (stream, ) end |
#on_subscribe(connection, stream) ⇒ Object
146 147 148 149 150 151 |
# File 'lib/peatio/ranger/router.rb', line 146 def on_subscribe(connection, stream) @streams_sockets[stream] ||= ConnectionArray.new @streams_sockets[stream] << connection send_snapshot_and_increments(connection, storekey(stream)) if increment?(stream) @metric_subscriptions_current&.set(compute_stream_subscriptions(stream), labels: {stream: stream}) end |
#on_unsubscribe(connection, stream) ⇒ Object
161 162 163 164 165 166 167 |
# File 'lib/peatio/ranger/router.rb', line 161 def on_unsubscribe(connection, stream) return unless @streams_sockets[stream] @streams_sockets[stream].delete(connection) @streams_sockets.delete(stream) if @streams_sockets[stream].empty? @metric_subscriptions_current&.set(compute_stream_subscriptions(stream), labels: {stream: stream}) end |
#sanity_check_metrics_connections ⇒ Object
101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/peatio/ranger/router.rb', line 101 def sanity_check_metrics_connections return unless @metric_connections_current connections_current_all = @metric_connections_current.values.values.reduce(0, :+) return if connections_current_all == compute_connections_all() logger.warn "slip detected in metric_connections_current, recalculating" connections_current_private = compute_connections_private() @metric_connections_current.set(connections_current_private, labels: {auth: "private"}) @metric_connections_current.set(compute_connections_all() - connections_current_private, labels: {auth: "public"}) end |
#send_private_message(user_id, event, payload_decoded) ⇒ Object
169 170 171 172 173 |
# File 'lib/peatio/ranger/router.rb', line 169 def (user_id, event, payload_decoded) Array(@connections_by_userid[user_id]).each do |connection| connection.send(event, payload_decoded) if connection.streams.include?(event) end end |
#send_public_message(stream, raw_message) ⇒ Object
175 176 177 178 179 |
# File 'lib/peatio/ranger/router.rb', line 175 def (stream, ) Array(@streams_sockets[stream]).each do |connection| connection.send_raw() end end |
#send_snapshot_and_increments(connection, key) ⇒ Object
153 154 155 156 157 158 159 |
# File 'lib/peatio/ranger/router.rb', line 153 def send_snapshot_and_increments(connection, key) return unless @stores[key] return unless @stores[key][:snapshot] connection.send_raw(@stores[key][:snapshot]) @stores[key][:increments]&.each {|inc| connection.send_raw(inc) } end |
#snapshot?(stream) ⇒ Boolean
48 49 50 |
# File 'lib/peatio/ranger/router.rb', line 48 def snapshot?(stream) stream.end_with?("-snap") end |
#stats ⇒ Object
60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/peatio/ranger/router.rb', line 60 def stats [ "==== Metrics ====", "ranger_connections_total{auth=\"public\"}: %d" % [@metric_connections_total.get(labels: {auth: "public"})], "ranger_connections_total{auth=\"private\"}: %d" % [@metric_connections_total.get(labels: {auth: "private"})], "ranger_connections_current{auth=\"public\"}: %d" % [@metric_connections_current.get(labels: {auth: "public"})], "ranger_connections_current{auth=\"private\"}: %d" % [@metric_connections_current.get(labels: {auth: "private"})], "ranger_subscriptions_current: %d" % [compute_streams_subscriptions()], "ranger_streams_kinds: %d" % [compute_streams_kinds()], ].join("\n") end |
#storekey(stream) ⇒ Object
56 57 58 |
# File 'lib/peatio/ranger/router.rb', line 56 def storekey(stream) stream.gsub(/-(snap|inc)$/, "") end |