Class: OnStomp::Failover::Buffers::Written

Inherits:
Object
  • Object
show all
Defined in:
lib/onstomp/failover/buffers/written.rb

Overview

A buffer that ensures frames are at least written to a client‘s connection and replays the ones that were not when the failover client reconnects.

Instance Method Summary collapse

Constructor Details

#initialize(failover) ⇒ Written

Returns a new instance of Written.



8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/onstomp/failover/buffers/written.rb', line 8

def initialize failover
  @failover = failover
  @buffer_mutex = Mutex.new
  @buffer = []
  @txs = {}
  
  failover.before_send &method(:buffer_frame)
  failover.before_commit &method(:buffer_frame)
  failover.before_abort &method(:buffer_frame)
  failover.before_subscribe &method(:buffer_frame)
  failover.before_begin &method(:buffer_transaction)
  # We can scrub the subscription before UNSUBSCRIBE is fully written
  # because if we replay before UNSUBSCRIBE was sent, we still don't
  # want to be subscribed when we reconnect.
  failover.before_unsubscribe &method(:debuffer_subscription)
  # We only want to scrub the transactions if ABORT or COMMIT was
  # at least written fully to the socket.
  failover.on_commit &method(:debuffer_transaction)
  failover.on_abort &method(:debuffer_transaction)
  failover.on_send &method(:debuffer_non_transactional_frame)
  
  failover.on_failover_connected &method(:replay)
end

Instance Method Details

#buffer_frame(f, *_) ⇒ Object

Adds a frame to a buffer so that it may be replayed if the failover client re-connects



34
35
36
37
38
39
40
# File 'lib/onstomp/failover/buffers/written.rb', line 34

def buffer_frame f, *_
  @buffer_mutex.synchronize do
    unless f.header? :'x-onstomp-failover-replay'
      @buffer << f 
    end
  end
end

#buffer_transaction(f, *_) ⇒ Object

Records the start of a transaction so that it may be replayed if the failover client re-connects



44
45
46
47
# File 'lib/onstomp/failover/buffers/written.rb', line 44

def buffer_transaction f, *_
  @txs[f[:transaction]] = true
  buffer_frame f
end

#debuffer_non_transactional_frame(f, *_) ⇒ Object

Removes a frame that is not part of a transaction from the buffer after it has been written the broker socket so that it will not be replayed when the failover client re-connects



74
75
76
77
78
# File 'lib/onstomp/failover/buffers/written.rb', line 74

def debuffer_non_transactional_frame f, *_
  unless @txs.key?(f[:transaction])
    @buffer_mutex.synchronize { @buffer.delete f }
  end
end

#debuffer_subscription(f, *_) ⇒ Object

Removes the matching SUBSCRIBE frame from the buffer after the UNSUBSCRIBE has been added to the connection’s write buffer so that it will not be replayed when the failover client re-connects



65
66
67
68
69
# File 'lib/onstomp/failover/buffers/written.rb', line 65

def debuffer_subscription f, *_
  @buffer_mutex.synchronize do
    @buffer.reject! { |bf| bf.command == 'SUBSCRIBE' && bf[:id] == f[:id] }
  end
end

#debuffer_transaction(f, *_) ⇒ Object

Removes the recorded transaction from the buffer after it has been written the broker socket so that it will not be replayed when the failover client re-connects



52
53
54
55
56
57
58
59
# File 'lib/onstomp/failover/buffers/written.rb', line 52

def debuffer_transaction f, *_
  tx = f[:transaction]
  if @txs.delete tx
    @buffer_mutex.synchronize do
      @buffer.reject! { |bf| bf[:transaction] == tx }
    end
  end
end

#replay(fail, client, *_) ⇒ Object

Called when the failover client triggers on_failover_connected to start replaying any frames in the buffer.



82
83
84
85
86
87
88
89
90
# File 'lib/onstomp/failover/buffers/written.rb', line 82

def replay fail, client, *_
  replay_frames = @buffer_mutex.synchronize do
    @buffer.select { |f| f[:'x-onstomp-failover-replay'] = '1'; true }
  end
  
  replay_frames.each do |f|
    client.transmit f
  end
end