Class: Synapse::DuplicationRecorder

Inherits:
Object
  • Object
show all
Defined in:
lib/synapse/common/duplication.rb

Overview

Records messages as they are sent to a bus so that duplicates can be tracked and prevented. Inspired by the de-duplication manager from Lokad.CQRS

This implementation is thread-safe

Instance Method Summary collapse

Constructor Details

#initializeDuplicationRecorder

Returns a new instance of DuplicationRecorder.



7
8
9
10
# File 'lib/synapse/common/duplication.rb', line 7

def initialize
  @recorded = Hash.new
  @lock = Mutex.new
end

Instance Method Details

#forget(message) ⇒ undefined

Forgets the given message

Parameters:

Returns:

  • (undefined)


39
40
41
42
43
# File 'lib/synapse/common/duplication.rb', line 39

def forget(message)
  @lock.synchronize do
    @recorded.delete message.id
  end
end

#forget_older_than(threshold) ⇒ undefined

Cleans up messages that are older than the given timestamp

Parameters:

  • threshold (Time)

Returns:

  • (undefined)


49
50
51
52
53
54
55
# File 'lib/synapse/common/duplication.rb', line 49

def forget_older_than(threshold)
  @lock.synchronize do
    @recorded.delete_if do |message_id, timestamp|
      timestamp <= threshold
    end
  end
end

#record(message) ⇒ undefined

Records the given message so that duplicates can be ignored

Parameters:

Returns:

  • (undefined)

Raises:



17
18
19
20
21
22
23
24
25
# File 'lib/synapse/common/duplication.rb', line 17

def record(message)
  @lock.synchronize do
    if @recorded.has_key? message.id
      raise DuplicationError
    end

    @recorded.store message.id, Time.now
  end
end

#recorded?(message) ⇒ Boolean

Returns true if the given message has been recorded

Parameters:

Returns:

  • (Boolean)


31
32
33
# File 'lib/synapse/common/duplication.rb', line 31

def recorded?(message)
  @recorded.has_key? message.id
end