Class: Promiscuous::Publisher::Operation::Atomic

Inherits:
Base
  • Object
show all
Defined in:
lib/promiscuous/publisher/operation/atomic.rb

Instance Attribute Summary collapse

Attributes inherited from Base

#operation

Instance Method Summary collapse

Methods inherited from Base

_acquire_lock, #dependencies_for, #dependency_for_op_lock, #ensure_op_still_locked, #execute, #explain_operation, #generate_payload, #get_new_op_lock, #increment_dependencies, lock_options, #on_rabbitmq_confirm, #payload_for, #publish_payload_in_rabbitmq_async, #publish_payload_in_redis, rabbitmq_staging_set_key, #record_timestamp, #recover_db_operation, recover_locks, recover_operation, recover_operation_from_lock, recover_payloads_for_rabbitmq, #recovering?, #recovery_payload, register_recovery_mechanism, #release_op_lock, run_recovery_mechanisms, #should_instrument_query?, #trace_operation, #write_dependencies

Constructor Details

#initialize(options = {}) ⇒ Atomic

Returns a new instance of Atomic.



5
6
7
8
# File 'lib/promiscuous/publisher/operation/atomic.rb', line 5

def initialize(options={})
  super
  @instance = options[:instance]
end

Instance Attribute Details

#instanceObject

XXX instance can be a selector representation.



3
4
5
# File 'lib/promiscuous/publisher/operation/atomic.rb', line 3

def instance
  @instance
end

Instance Method Details

#acquire_op_lockObject



10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/promiscuous/publisher/operation/atomic.rb', line 10

def acquire_op_lock
  unless dependency_for_op_lock
    return unless reload_instance
  end

  loop do
    instance_dep = dependency_for_op_lock

    super

    return if operation == :create

    # We need to make sure that the lock we acquired matches our selector.
    # There is a bit of room for optimization if we know that we don't have
    # any tracked attributes on the model and our selector is already an id.
    return unless reload_instance

    # If reload_instance changed the current instance because the selector,
    # we need to unlock the old instance, lock this new instance, and
    # retry.
    return if instance_dep == dependency_for_op_lock

    # XXX What should we do if we are going in a live lock?
    # Sleep with some jitter?
    release_op_lock
  end
end

#do_database_query(query) ⇒ Object



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/promiscuous/publisher/operation/atomic.rb', line 38

def do_database_query(query)
  case operation
  when :create
    # We don't stash the version in the document as we can't have races
    # on the same document.
  when :update
    increment_version_in_document
    # We are now in the possession of an instance that matches the original
    # selector. We need to make sure the db query will operate on it,
    # instead of the original selector.
    use_id_selector(:use_atomic_version_selector => true)
    # We need to use an atomic versioned selector to make sure that
    # if we lose the lock for a long period of time, we don't mess up
    # the record. Perhaps the operation has been recovered a while ago.
  when :destroy
    use_id_selector
  end

  # The driver is responsible to set instance to the appropriate value.
  query.call_and_remember_result(:instrumented)

  if query.failed?
    # If we get an network failure, we should retry later.
    return if recoverable_failure?(query.exception)
    @instance = nil
  end
end

#execute_instrumented(query) ⇒ Object



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# File 'lib/promiscuous/publisher/operation/atomic.rb', line 73

def execute_instrumented(query)
  if recovering?
    # The DB died or something. We cannot find our instance any more :(
    # this is a problem, but we need to publish.
    yell_about_missing_instance if @instance.nil?
  else
    acquire_op_lock

    if @instance.nil?
      # The selector missed the instance, bailing out.
      query.call_and_remember_result(:non_instrumented)
      return
    end
  end

  # All the versions are updated and a marked as pending for publish in Redis
  # atomically in case we die before we could write the versions in the
  # database. Once incremented, concurrent queries that are reading our
  # instance will be serialized after our write, even through it may read our
  # old instance. This is a race that we tolerate.
  # XXX We also stash the document for create operations, so the recovery can
  # redo the create to avoid races when instances are getting partitioned.
  increment_dependencies

  # From this point, if we die, the one expiring our write locks must finish
  # the publish, either by sending a dummy, or by sending the real instance.
  # We could have die before or after the database query.

  # We save the versions in the database, as it is our source of truth.
  # This allow a reconstruction of redis in the face of failures.
  # We would also need to send a special message to the subscribers to reset
  # their read counters to the last write version since we would not be able
  # to restore the read counters (and we don't want to store them because
  # this would dramatically augment our footprint on the db).
  #
  # If we are doing a destroy operation, and redis dies right after, and
  # we happen to lost contact with rabbitmq, recovery is going to be complex:
  # we would need to do a diff from the dummy subscriber to see what
  # documents are missing on our side to be able to resend the destroy
  # message.

  do_database_query(query) unless @instance.nil?
  # We take a timestamp right after the write is performed because latency
  # measurements are performed on the subscriber.
  record_timestamp

  # This make sure that if the db operation failed because of a network issue
  # and we got recovered, we don't send anything as we could send a different
  # message than the recovery mechanism.
  ensure_op_still_locked

  generate_payload

  # As soon as we unlock the locks, the rescuer will not be able to assume
  # that the database instance is still pristine, and so we need to stash the
  # payload in redis. If redis dies, we don't care because it can be
  # reconstructed. Subscribers can see "compressed" updates.
  publish_payload_in_redis

  # TODO Performance: merge these 3 redis operations to speed things up.
  release_op_lock

  # If we die from this point on, a recovery worker can republish our payload
  # since we queued it in Redis.

  # We don't care if we lost the lock and got recovered, subscribers are
  # immune to duplicate messages.
  publish_payload_in_rabbitmq_async
end

#fetch_instanceObject



151
152
153
154
155
156
# File 'lib/promiscuous/publisher/operation/atomic.rb', line 151

def fetch_instance
  # This method is overridden to use the original query selector.
  # Should return nil if the instance is not found.
  @instance.reload if @instance.respond_to?(:reload)
  @instance
end

#increment_version_in_documentObject



162
163
164
# File 'lib/promiscuous/publisher/operation/atomic.rb', line 162

def increment_version_in_document
  # Overridden to increment version field in the query
end

#operation_payloadsObject



143
144
145
# File 'lib/promiscuous/publisher/operation/atomic.rb', line 143

def operation_payloads
  @instance.nil? ? [] : [payload_for(@instance)]
end

#query_dependenciesObject



147
148
149
# File 'lib/promiscuous/publisher/operation/atomic.rb', line 147

def query_dependencies
  dependencies_for(@instance)
end

#recoverable_failure?(exception) ⇒ Boolean

Returns:

  • (Boolean)


172
173
174
175
176
# File 'lib/promiscuous/publisher/operation/atomic.rb', line 172

def recoverable_failure?(exception)
  # Overridden to tell if the db exception is spurious, like a network
  # failure.
  raise
end

#reload_instanceObject



158
159
160
# File 'lib/promiscuous/publisher/operation/atomic.rb', line 158

def reload_instance
  @instance = fetch_instance
end

#use_id_selector(options = {}) ⇒ Object



166
167
168
169
170
# File 'lib/promiscuous/publisher/operation/atomic.rb', line 166

def use_id_selector(options={})
  # Overridden to use the {:id => @instance.id} selector.
  # if the option use_atomic_version_selector is passed, the driver must add
  # the version_field selector.
end

#yell_about_missing_instanceObject



66
67
68
69
70
71
# File 'lib/promiscuous/publisher/operation/atomic.rb', line 66

def yell_about_missing_instance
  err = "Cannot find document. Database had a dataloss?. Proceeding anyways. #{@recovery_data}"
  e = Promiscuous::Error::Recovery.new(err)
  Promiscuous.warn "[recovery] #{e}"
  Promiscuous::Config.error_notifier.call(e)
end