Class: OnStomp::Failover::Buffers::Written
- Inherits:
-
Object
- Object
- OnStomp::Failover::Buffers::Written
- Defined in:
- lib/onstomp/failover/buffers/written.rb
Overview
A buffer that ensures frames are at least written to a client‘s connection and replays the ones that were not when the failover client reconnects.
Instance Method Summary collapse
-
#buffer_frame(f, *_) ⇒ Object
Adds a frame to a buffer so that it may be replayed if the failover client re-connects.
-
#buffer_transaction(f, *_) ⇒ Object
Records the start of a transaction so that it may be replayed if the failover client re-connects.
-
#debuffer_non_transactional_frame(f, *_) ⇒ Object
Removes a frame that is not part of a transaction from the buffer after it has been written the broker socket so that it will not be replayed when the failover client re-connects.
-
#debuffer_subscription(f, *_) ⇒ Object
Removes the matching SUBSCRIBE frame from the buffer after the UNSUBSCRIBE has been added to the connection’s write buffer so that it will not be replayed when the failover client re-connects.
-
#debuffer_transaction(f, *_) ⇒ Object
Removes the recorded transaction from the buffer after it has been written the broker socket so that it will not be replayed when the failover client re-connects.
-
#initialize(failover) ⇒ Written
constructor
A new instance of Written.
-
#replay(fail, client, *_) ⇒ Object
Called when the failover client triggers
on_failover_connected
to start replaying any frames in the buffer.
Constructor Details
#initialize(failover) ⇒ Written
Returns a new instance of Written.
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
# File 'lib/onstomp/failover/buffers/written.rb', line 8 def initialize failover @failover = failover @buffer_mutex = Mutex.new @buffer = [] @txs = {} failover.before_send &method(:buffer_frame) failover.before_commit &method(:buffer_frame) failover.before_abort &method(:buffer_frame) failover.before_subscribe &method(:buffer_frame) failover.before_begin &method(:buffer_transaction) # We can scrub the subscription before UNSUBSCRIBE is fully written # because if we replay before UNSUBSCRIBE was sent, we still don't # want to be subscribed when we reconnect. failover.before_unsubscribe &method(:debuffer_subscription) # We only want to scrub the transactions if ABORT or COMMIT was # at least written fully to the socket. failover.on_commit &method(:debuffer_transaction) failover.on_abort &method(:debuffer_transaction) failover.on_send &method(:debuffer_non_transactional_frame) failover.on_failover_connected &method(:replay) end |
Instance Method Details
#buffer_frame(f, *_) ⇒ Object
Adds a frame to a buffer so that it may be replayed if the failover client re-connects
34 35 36 37 38 39 40 |
# File 'lib/onstomp/failover/buffers/written.rb', line 34 def buffer_frame f, *_ @buffer_mutex.synchronize do unless f.header? :'x-onstomp-failover-replay' @buffer << f end end end |
#buffer_transaction(f, *_) ⇒ Object
Records the start of a transaction so that it may be replayed if the failover client re-connects
44 45 46 47 |
# File 'lib/onstomp/failover/buffers/written.rb', line 44 def buffer_transaction f, *_ @txs[f[:transaction]] = true buffer_frame f end |
#debuffer_non_transactional_frame(f, *_) ⇒ Object
Removes a frame that is not part of a transaction from the buffer after it has been written the broker socket so that it will not be replayed when the failover client re-connects
74 75 76 77 78 |
# File 'lib/onstomp/failover/buffers/written.rb', line 74 def debuffer_non_transactional_frame f, *_ unless @txs.key?(f[:transaction]) @buffer_mutex.synchronize { @buffer.delete f } end end |
#debuffer_subscription(f, *_) ⇒ Object
Removes the matching SUBSCRIBE frame from the buffer after the UNSUBSCRIBE has been added to the connection’s write buffer so that it will not be replayed when the failover client re-connects
65 66 67 68 69 |
# File 'lib/onstomp/failover/buffers/written.rb', line 65 def debuffer_subscription f, *_ @buffer_mutex.synchronize do @buffer.reject! { |bf| bf.command == 'SUBSCRIBE' && bf[:id] == f[:id] } end end |
#debuffer_transaction(f, *_) ⇒ Object
Removes the recorded transaction from the buffer after it has been written the broker socket so that it will not be replayed when the failover client re-connects
52 53 54 55 56 57 58 59 |
# File 'lib/onstomp/failover/buffers/written.rb', line 52 def debuffer_transaction f, *_ tx = f[:transaction] if @txs.delete tx @buffer_mutex.synchronize do @buffer.reject! { |bf| bf[:transaction] == tx } end end end |
#replay(fail, client, *_) ⇒ Object
Called when the failover client triggers on_failover_connected
to start replaying any frames in the buffer.
82 83 84 85 86 87 88 89 90 |
# File 'lib/onstomp/failover/buffers/written.rb', line 82 def replay fail, client, *_ replay_frames = @buffer_mutex.synchronize do @buffer.select { |f| f[:'x-onstomp-failover-replay'] = '1'; true } end replay_frames.each do |f| client.transmit f end end |