Class: Roby::Distributed::Transaction
- Inherits:
-
Transaction
- Object
- BasicObject
- Plan
- Transaction
- Roby::Distributed::Transaction
- Includes:
- DistributedObject
- Defined in:
- lib/roby/distributed/transaction.rb
Overview
An implementation of a transaction distributed over multiple plan managers. The transaction modification protocol is based on an edition token, which is passed through all the transaction owners by #edit and #release.
Most operations on this distributed transaction must be done outside the control thread, as they are blocking.
See DistributedObject for a list of operations valid on distributed objects.
Defined Under Namespace
Classes: DRoby
Constant Summary
Constants included from Log::TransactionHooks
Constants included from Log::PlanHooks
Constants included from Log::BasicObjectHooks
Instance Attribute Summary collapse
-
#edition_reloop ⇒ Object
readonly
True if one of the editors request that the token is passed to them once more.
-
#owners ⇒ Object
readonly
Returns the value of attribute owners.
-
#token_lock ⇒ Object
readonly
Returns the value of attribute token_lock.
-
#token_lock_signal ⇒ Object
readonly
Returns the value of attribute token_lock_signal.
Attributes inherited from Transaction
#auto_tasks, #conflict_solver, #discarded_tasks, #options, #plan, #proxy_objects, #removed_objects
Attributes inherited from Plan
#force_gc, #free_events, #gc_quarantine, #keepalive, #known_tasks, #missions, #repairs, #task_events, #task_index, #transactions
Attributes inherited from BasicObject
Instance Method Summary collapse
-
#abandoned_commit(error) ⇒ Object
Hook called when the transaction commit has been abandoned because a owner refused it.
-
#commit_transaction(synchro = true) ⇒ Object
Commits the transaction.
-
#copy_object_relations(object, proxy) ⇒ Object
:nodoc:.
-
#discard_transaction(synchro = true) ⇒ Object
Discards the transaction.
-
#discover(objects) ⇒ Object
:nodoc:.
-
#do_wrap(base_object, create) ⇒ Object
:nodoc:.
-
#droby_dump(dest) ⇒ Object
Returns a representation of
selfwhich can be used to reference it in our communication withdest. -
#edit(reloop = false) ⇒ Object
Waits for the edition token.
- #edit!(reloop) ⇒ Object
-
#first_editor? ⇒ Boolean
True if this plan manager is the first editor, i.e.
-
#initialize(plan, options = {}) ⇒ Transaction
constructor
Create a new distributed transaction based on the given plan.
-
#next_editor ⇒ Object
Returns the peer which is after this plan manager in the edition order.
-
#prepare_remove_owner(peer) ⇒ Object
Checks that
peercan be removed from the list of owners. -
#propose(peer = nil, &block) ⇒ Object
Announces the transaction on
peeror, ifpeeris nil, to all owners who don’t know about it yet. -
#release(give_back = false) ⇒ Object
Releases the edition token, giving it to the next owner.
Methods included from DistributedObject
#add_owner, #added_owner, #call_owners, #call_siblings, #owner=, #remove_owner, #removed_owner, #self_owned, #self_owned=
Methods inherited from Transaction
#adding_plan_relation, #auto, #check_valid_transaction, #clear, #committed_transaction, #disable_proxying, #discard, #discard_modifications, #discarded_transaction, #discover_neighborhood, #enable_proxying, #executable?, #finalized?, #finalized_plan_event, #finalized_plan_task, #freezed!, #freezed?, #insert, #invalid=, #invalid?, #invalidate, #may_unwrap, #may_wrap, #merged_generated_subgraphs, #permanent, #proxying?, #query_each, #query_result_set, #query_roots, #remove_object, #remove_plan_object, #removing_plan_relation, #replace, #restore_relation, #valid_transaction?, #wrap
Methods included from Log::TransactionHooks
#committed_transaction, #discarded_transaction
Methods inherited from Plan
#[], #add_repair, #added_transaction, #auto, can_gc?, #clear, #discard, #discarded, #discover_event_set, #discover_task_set, #discovered, #discovered_events, #discovered_tasks, #each_task, #empty?, #executable?, #finalized, #finalized_event, #finalized_task, #find_tasks, #garbage, #garbage_collect, #handle_replace, #include?, #insert, #inserted, #inspect, #local_tasks, #locally_useful_tasks, #mission?, #owns?, #partition_event_task, #permanent, #permanent?, #query_each, #query_result_set, #query_roots, #real_plan, #remote_tasks, #remove_object, #remove_repair, #remove_task, #remove_transaction, #removed_transaction, #repairs_for, #replace, #replace_task, #replaced, #respawn, #sibling_on?, #size, #unneeded_events, #unneeded_tasks, #useful_event_component, #useful_events, #useful_task?, #useful_task_component
Methods included from TaskStructure::ExecutionAgentSpawn
Methods included from EventNotifications::PlanCacheCleanup
Methods included from PlanModificationHooks
#discarded, #discovered_events, discovered_objects, #discovered_tasks, #finalized_event, finalized_object, #finalized_task, #inserted, #replaced
Methods included from Transactions::PlanUpdates
#finalized_event, finalized_object, #finalized_task
Methods included from Propagation::RemoveDelayedOnFinalized
Methods included from Log::PlanHooks
#added_transaction, #discarded, #discovered_events, #discovered_tasks, #finalized_event, #finalized_task, #garbage, #inserted, #removed_transaction, #replaced
Methods included from EventGenerator::FinalizedEventHook
Methods inherited from BasicObject
#add_sibling_for, #distribute?, distribute?, #finalized?, #forget_peer, #has_sibling_on?, #initialize_copy, local_only, #read_write?, #remotely_useful?, #remove_sibling_for, #self_owned?, #sibling_of, #sibling_on, #subscribe, #subscribed?, #update_on?, #updated?, #updated_by?, #updated_peers
Methods included from Log::BasicObjectHooks
Constructor Details
#initialize(plan, options = {}) ⇒ Transaction
Create a new distributed transaction based on the given plan. The transaction sole owner is the local plan manager, which is also the owner of the edition token.
43 44 45 46 47 48 49 50 51 |
# File 'lib/roby/distributed/transaction.rb', line 43 def initialize(plan, = {}) @owners = [Distributed] @editor = true @token_lock = Mutex.new @token_lock_signal = ConditionVariable.new super end |
Instance Attribute Details
#edition_reloop ⇒ Object (readonly)
True if one of the editors request that the token is passed to them once more. The transaction can be committed only when all peers did not request that.
See #release
239 240 241 |
# File 'lib/roby/distributed/transaction.rb', line 239 def edition_reloop @edition_reloop end |
#owners ⇒ Object (readonly)
Returns the value of attribute owners.
36 37 38 |
# File 'lib/roby/distributed/transaction.rb', line 36 def owners @owners end |
#token_lock ⇒ Object (readonly)
Returns the value of attribute token_lock.
37 38 39 |
# File 'lib/roby/distributed/transaction.rb', line 37 def token_lock @token_lock end |
#token_lock_signal ⇒ Object (readonly)
Returns the value of attribute token_lock_signal.
37 38 39 |
# File 'lib/roby/distributed/transaction.rb', line 37 def token_lock_signal @token_lock_signal end |
Instance Method Details
#abandoned_commit(error) ⇒ Object
Hook called when the transaction commit has been abandoned because a owner refused it. reason is the value returned by this peer.
210 211 212 213 |
# File 'lib/roby/distributed/transaction.rb', line 210 def abandoned_commit(error) Distributed.debug { "abandoned commit of #{self} because of #{error}" } super if defined? super end |
#commit_transaction(synchro = true) ⇒ Object
Commits the transaction. This method can only be called by the first editor of the transaction, once all owners have requested no additional modifications.
Distributed commits are done in two steps, to make sure that all owners agree to actually perform it. First, the PeerServer#transaction_prepare_commit message is sent, which can return either nil or an error object.
If all peers return nil, the actual commit is performed by sending the PeerServer#transaction_commit message. Otherwise, the commit is abandonned by sending the PeerServer#transaction_abandon_commit message to the transaction owners.
171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 |
# File 'lib/roby/distributed/transaction.rb', line 171 def commit_transaction(synchro = true) if !self_owned? raise OwnershipError, "cannot commit a transaction which is not owned locally. #{self} is owned by #{owners.to_a}" elsif synchro if !editor? raise NotEditor, "not editor of this transaction" elsif !first_editor? raise NotEditor, "transactions are committed by their first editor" elsif edition_reloop raise NotReady, "transaction still needs editing" end end if synchro result = call_owners(:transaction_prepare_commit, self) error = result.find_all { |_, returned| returned } if !error.empty? call_owners(:transaction_abandon_commit, self, error) return false else call_owners(:transaction_commit, self) return true end else all_objects = known_tasks.dup proxy_objects.each_key { |o| all_objects << o } Distributed.update(self) do Distributed.update_all(all_objects) do super() { yield if block_given? } end end end self end |
#copy_object_relations(object, proxy) ⇒ Object
:nodoc:
90 91 92 93 94 95 96 97 98 |
# File 'lib/roby/distributed/transaction.rb', line 90 def copy_object_relations(object, proxy) # :nodoc: # If the transaction is being updated, it means that we are # discovering the new transaction. In that case, no need to # discover the plan relations since our peer will send us all # transaction relations unless Distributed.updating?(self) super end end |
#discard_transaction(synchro = true) ⇒ Object
Discards the transaction. Unlike #commit_transaction, this can be called by any of the owners.
220 221 222 223 224 225 226 227 228 229 230 |
# File 'lib/roby/distributed/transaction.rb', line 220 def discard_transaction(synchro = true) # :nodoc: unless Distributed.owns?(self) raise OwnershipError, "cannot discard a transaction which is not owned locally. #{self} is owned by #{owners}" end if synchro call_siblings(:transaction_discard, self) else super() end self end |
#discover(objects) ⇒ Object
:nodoc:
136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 |
# File 'lib/roby/distributed/transaction.rb', line 136 def discover(objects) # :nodoc: if objects events, tasks = partition_event_task(objects) for object in (events || []) + (tasks || []) unless Distributed.updating?(object) || Distributed.owns?(object) || (object.owners - owners).empty? raise OwnershipError, "#{object} is not owned by #{owners.to_a} (#{object.owners.to_a})" end end super(events) if events super(tasks) if tasks else super end end |
#do_wrap(base_object, create) ⇒ Object
:nodoc:
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/roby/distributed/transaction.rb', line 53 def do_wrap(base_object, create) # :nodoc: # It is allowed to add objects in a transaction only if # * the object is not distribuable. It means that we are # annotating *locally* remote tasks (like it is done for # ConnectionTask for instance). # * the object is owned by the transaction owners if create && (base_object.distribute? && !(base_object.owners - owners).empty?) raise OwnershipError, "plan owners #{owners} do not own #{base_object}: #{base_object.owners}" end temporarily_subscribed = !base_object.updated? if temporarily_subscribed peer = base_object.owners.first base_object = peer.subscribe(base_object) end if object = super object.extend DistributedObject if !Distributed.updating?(self) && object.root_object? && base_object.distribute? # The new proxy has been sent to remote hosts since it # has been discovered in the transaction. Nonetheless, # we don't want to return from #wrap until we know its # sibling. Add a synchro point to wait for that updated_peers.each do |peer| peer.synchro_point end end end object ensure if temporarily_subscribed peer.unsubscribe(base_object) end end |
#droby_dump(dest) ⇒ Object
Returns a representation of self which can be used to reference it in our communication with dest.
377 378 379 380 381 382 383 384 385 |
# File 'lib/roby/distributed/transaction.rb', line 377 def droby_dump(dest) # :nodoc: if remote_siblings.has_key?(dest) remote_id else DRoby.new(remote_siblings.droby_dump(dest), owners.droby_dump(dest), plan.droby_dump(dest), .droby_dump(dest)) end end |
#edit(reloop = false) ⇒ Object
Waits for the edition token. If a block is given, it is called when the token is achieved, and releases the token when the blocks returns.
271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 |
# File 'lib/roby/distributed/transaction.rb', line 271 def edit(reloop = false) if Thread.current[:control_mutex_locked] raise "cannot call #edit with the control mutex taken !" end token_lock.synchronize do if !editor? # not the current editor token_lock_signal.wait(token_lock) end end if block_given? begin yield ensure release(reloop) end end end |
#edit!(reloop) ⇒ Object
260 261 262 263 264 265 266 |
# File 'lib/roby/distributed/transaction.rb', line 260 def edit!(reloop) token_lock.synchronize do @editor = true @edition_reloop = reloop token_lock_signal.broadcast end end |
#first_editor? ⇒ Boolean
True if this plan manager is the first editor, i.e. the plan manager whose responsibility is to manage the edition protocol.
243 244 245 |
# File 'lib/roby/distributed/transaction.rb', line 243 def first_editor? owners.first == Distributed end |
#next_editor ⇒ Object
Returns the peer which is after this plan manager in the edition order. The edition token will be sent to this peer by #release
248 249 250 251 252 253 254 255 256 257 258 |
# File 'lib/roby/distributed/transaction.rb', line 248 def next_editor if owners.last == Distributed return owners.first end owners.each_cons(2) do |first, second| if first == Distributed return second end end end |
#prepare_remove_owner(peer) ⇒ Object
Checks that peer can be removed from the list of owners
101 102 103 104 105 106 107 108 109 |
# File 'lib/roby/distributed/transaction.rb', line 101 def prepare_remove_owner(peer) known_tasks.each do |t| t = t.__getobj__ if t.respond_to?(:__getobj__) if peer.owns?(t) && t.distribute? raise OwnershipError, "#{peer} still owns tasks in the transaction (#{t})" end end nil end |
#propose(peer = nil, &block) ⇒ Object
Announces the transaction on peer or, if peer is nil, to all owners who don’t know about it yet. This operation is asynchronous, so the block, if given, will be called for each remote peer which has processed the message.
See Peer#transaction_propose
117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
# File 'lib/roby/distributed/transaction.rb', line 117 def propose(peer = nil, &block) if !self_owned? raise OwnershipError, "cannot propose a transaction we don't own" end if peer peer.transaction_propose(self, &block) else (owners - remote_siblings.keys).each do |peer| if peer != Roby::Distributed Distributed.debug "proposing #{self} to #{peer}" propose(peer) do yield(peer) end end end end end |
#release(give_back = false) ⇒ Object
Releases the edition token, giving it to the next owner. If give_back is true, the local plan manager announces that it expects the token to be given back to it once more. The commit is allowed only when all peers have released the edition token without requesting it once more.
It sends the #transaction_give_token to the peer returned by #next_editor.
Raised NotEditor if the local plan manager is not the current transaction editor.
302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 |
# File 'lib/roby/distributed/transaction.rb', line 302 def release(give_back = false) token_lock.synchronize do if !editor? raise NotEditor, "not editor" else reloop = if first_editor? give_back else edition_reloop || give_back end return if owners.size == 1 @editor = false next_editor.transaction_give_token(self, reloop) true end end end |