Class: Promiscuous::Publisher::Operation::Atomic
- Defined in:
- lib/promiscuous/publisher/operation/atomic.rb
Direct Known Subclasses
Moped::PromiscuousCollectionWrapper::PromiscuousCollectionOperation, Moped::PromiscuousQueryWrapper::PromiscuousWriteOperation, Ephemeral
Instance Attribute Summary collapse
-
#instance ⇒ Object
XXX instance can be a selector representation.
Attributes inherited from Base
Instance Method Summary collapse
- #acquire_op_lock ⇒ Object
- #do_database_query(query) ⇒ Object
- #execute_instrumented(query) ⇒ Object
- #fetch_instance ⇒ Object
- #increment_version_in_document ⇒ Object
-
#initialize(options = {}) ⇒ Atomic
constructor
A new instance of Atomic.
- #operation_payloads ⇒ Object
- #query_dependencies ⇒ Object
- #recoverable_failure?(exception) ⇒ Boolean
- #reload_instance ⇒ Object
- #use_id_selector(options = {}) ⇒ Object
- #yell_about_missing_instance ⇒ Object
Methods inherited from Base
_acquire_lock, #dependencies_for, #dependency_for_op_lock, #ensure_op_still_locked, #execute, #explain_operation, #generate_payload, #get_new_op_lock, #increment_dependencies, lock_options, #on_rabbitmq_confirm, #payload_for, #publish_payload_in_rabbitmq_async, #publish_payload_in_redis, rabbitmq_staging_set_key, #record_timestamp, #recover_db_operation, recover_locks, recover_operation, recover_operation_from_lock, recover_payloads_for_rabbitmq, #recovering?, #recovery_payload, register_recovery_mechanism, #release_op_lock, run_recovery_mechanisms, #should_instrument_query?, #trace_operation, #write_dependencies
Constructor Details
#initialize(options = {}) ⇒ Atomic
Returns a new instance of Atomic.
5 6 7 8 |
# File 'lib/promiscuous/publisher/operation/atomic.rb', line 5 def initialize(={}) super @instance = [:instance] end |
Instance Attribute Details
#instance ⇒ Object
XXX instance can be a selector representation.
3 4 5 |
# File 'lib/promiscuous/publisher/operation/atomic.rb', line 3 def instance @instance end |
Instance Method Details
#acquire_op_lock ⇒ Object
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/promiscuous/publisher/operation/atomic.rb', line 10 def acquire_op_lock unless dependency_for_op_lock return unless reload_instance end loop do instance_dep = dependency_for_op_lock super return if operation == :create # We need to make sure that the lock we acquired matches our selector. # There is a bit of room for optimization if we know that we don't have # any tracked attributes on the model and our selector is already an id. return unless reload_instance # If reload_instance changed the current instance because the selector, # we need to unlock the old instance, lock this new instance, and # retry. return if instance_dep == dependency_for_op_lock # XXX What should we do if we are going in a live lock? # Sleep with some jitter? release_op_lock end end |
#do_database_query(query) ⇒ Object
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/promiscuous/publisher/operation/atomic.rb', line 38 def do_database_query(query) case operation when :create # We don't stash the version in the document as we can't have races # on the same document. when :update increment_version_in_document # We are now in the possession of an instance that matches the original # selector. We need to make sure the db query will operate on it, # instead of the original selector. use_id_selector(:use_atomic_version_selector => true) # We need to use an atomic versioned selector to make sure that # if we lose the lock for a long period of time, we don't mess up # the record. Perhaps the operation has been recovered a while ago. when :destroy use_id_selector end # The driver is responsible to set instance to the appropriate value. query.call_and_remember_result(:instrumented) if query.failed? # If we get an network failure, we should retry later. return if recoverable_failure?(query.exception) @instance = nil end end |
#execute_instrumented(query) ⇒ Object
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 |
# File 'lib/promiscuous/publisher/operation/atomic.rb', line 73 def execute_instrumented(query) if recovering? # The DB died or something. We cannot find our instance any more :( # this is a problem, but we need to publish. yell_about_missing_instance if @instance.nil? else acquire_op_lock if @instance.nil? # The selector missed the instance, bailing out. query.call_and_remember_result(:non_instrumented) return end end # All the versions are updated and a marked as pending for publish in Redis # atomically in case we die before we could write the versions in the # database. Once incremented, concurrent queries that are reading our # instance will be serialized after our write, even through it may read our # old instance. This is a race that we tolerate. # XXX We also stash the document for create operations, so the recovery can # redo the create to avoid races when instances are getting partitioned. increment_dependencies # From this point, if we die, the one expiring our write locks must finish # the publish, either by sending a dummy, or by sending the real instance. # We could have die before or after the database query. # We save the versions in the database, as it is our source of truth. # This allow a reconstruction of redis in the face of failures. # We would also need to send a special message to the subscribers to reset # their read counters to the last write version since we would not be able # to restore the read counters (and we don't want to store them because # this would dramatically augment our footprint on the db). # # If we are doing a destroy operation, and redis dies right after, and # we happen to lost contact with rabbitmq, recovery is going to be complex: # we would need to do a diff from the dummy subscriber to see what # documents are missing on our side to be able to resend the destroy # message. do_database_query(query) unless @instance.nil? # We take a timestamp right after the write is performed because latency # measurements are performed on the subscriber. # This make sure that if the db operation failed because of a network issue # and we got recovered, we don't send anything as we could send a different # message than the recovery mechanism. ensure_op_still_locked generate_payload # As soon as we unlock the locks, the rescuer will not be able to assume # that the database instance is still pristine, and so we need to stash the # payload in redis. If redis dies, we don't care because it can be # reconstructed. Subscribers can see "compressed" updates. publish_payload_in_redis # TODO Performance: merge these 3 redis operations to speed things up. release_op_lock # If we die from this point on, a recovery worker can republish our payload # since we queued it in Redis. # We don't care if we lost the lock and got recovered, subscribers are # immune to duplicate messages. publish_payload_in_rabbitmq_async end |
#fetch_instance ⇒ Object
151 152 153 154 155 156 |
# File 'lib/promiscuous/publisher/operation/atomic.rb', line 151 def fetch_instance # This method is overridden to use the original query selector. # Should return nil if the instance is not found. @instance.reload if @instance.respond_to?(:reload) @instance end |
#increment_version_in_document ⇒ Object
162 163 164 |
# File 'lib/promiscuous/publisher/operation/atomic.rb', line 162 def increment_version_in_document # Overridden to increment version field in the query end |
#operation_payloads ⇒ Object
143 144 145 |
# File 'lib/promiscuous/publisher/operation/atomic.rb', line 143 def operation_payloads @instance.nil? ? [] : [payload_for(@instance)] end |
#query_dependencies ⇒ Object
147 148 149 |
# File 'lib/promiscuous/publisher/operation/atomic.rb', line 147 def query_dependencies dependencies_for(@instance) end |
#recoverable_failure?(exception) ⇒ Boolean
172 173 174 175 176 |
# File 'lib/promiscuous/publisher/operation/atomic.rb', line 172 def recoverable_failure?(exception) # Overridden to tell if the db exception is spurious, like a network # failure. raise end |
#reload_instance ⇒ Object
158 159 160 |
# File 'lib/promiscuous/publisher/operation/atomic.rb', line 158 def reload_instance @instance = fetch_instance end |
#use_id_selector(options = {}) ⇒ Object
166 167 168 169 170 |
# File 'lib/promiscuous/publisher/operation/atomic.rb', line 166 def use_id_selector(={}) # Overridden to use the {:id => @instance.id} selector. # if the option use_atomic_version_selector is passed, the driver must add # the version_field selector. end |
#yell_about_missing_instance ⇒ Object
66 67 68 69 70 71 |
# File 'lib/promiscuous/publisher/operation/atomic.rb', line 66 def yell_about_missing_instance err = "Cannot find document. Database had a dataloss?. Proceeding anyways. #{@recovery_data}" e = Promiscuous::Error::Recovery.new(err) Promiscuous.warn "[recovery] #{e}" Promiscuous::Config.error_notifier.call(e) end |