Class: Roby::Distributed::Transaction

Inherits:
Transaction show all
Includes:
DistributedObject
Defined in:
lib/roby/distributed/transaction.rb

Overview

An implementation of a transaction distributed over multiple plan managers. The transaction modification protocol is based on an edition token, which is passed through all the transaction owners by #edit and #release.

Most operations on this distributed transaction must be done outside the control thread, as they are blocking.

See DistributedObject for a list of operations valid on distributed objects.

Defined Under Namespace

Classes: DRoby

Constant Summary

Constants included from Log::TransactionHooks

Log::TransactionHooks::HOOKS

Constants included from Log::PlanHooks

Log::PlanHooks::HOOKS

Constants included from Log::BasicObjectHooks

Log::BasicObjectHooks::HOOKS

Instance Attribute Summary collapse

Attributes inherited from Transaction

#auto_tasks, #conflict_solver, #discarded_tasks, #options, #plan, #proxy_objects, #removed_objects

Attributes inherited from Plan

#force_gc, #free_events, #gc_quarantine, #keepalive, #known_tasks, #missions, #repairs, #task_events, #task_index, #transactions

Attributes inherited from BasicObject

#distribute

Instance Method Summary collapse

Methods included from DistributedObject

#add_owner, #added_owner, #call_owners, #call_siblings, #owner=, #remove_owner, #removed_owner, #self_owned, #self_owned=

Methods inherited from Transaction

#adding_plan_relation, #auto, #check_valid_transaction, #clear, #committed_transaction, #disable_proxying, #discard, #discard_modifications, #discarded_transaction, #discover_neighborhood, #enable_proxying, #executable?, #finalized?, #finalized_plan_event, #finalized_plan_task, #freezed!, #freezed?, #insert, #invalid=, #invalid?, #invalidate, #may_unwrap, #may_wrap, #merged_generated_subgraphs, #permanent, #proxying?, #query_each, #query_result_set, #query_roots, #remove_object, #remove_plan_object, #removing_plan_relation, #replace, #restore_relation, #valid_transaction?, #wrap

Methods included from Log::TransactionHooks

#committed_transaction, #discarded_transaction

Methods inherited from Plan

#[], #add_repair, #added_transaction, #auto, can_gc?, #clear, #discard, #discarded, #discover_event_set, #discover_task_set, #discovered, #discovered_events, #discovered_tasks, #each_task, #empty?, #executable?, #finalized, #finalized_event, #finalized_task, #find_tasks, #garbage, #garbage_collect, #handle_replace, #include?, #insert, #inserted, #inspect, #local_tasks, #locally_useful_tasks, #mission?, #owns?, #partition_event_task, #permanent, #permanent?, #query_each, #query_result_set, #query_roots, #real_plan, #remote_tasks, #remove_object, #remove_repair, #remove_task, #remove_transaction, #removed_transaction, #repairs_for, #replace, #replace_task, #replaced, #respawn, #sibling_on?, #size, #unneeded_events, #unneeded_tasks, #useful_event_component, #useful_events, #useful_task?, #useful_task_component

Methods included from TaskStructure::ExecutionAgentSpawn

#discovered_tasks

Methods included from EventNotifications::PlanCacheCleanup

#finalized_event

Methods included from PlanModificationHooks

#discarded, #discovered_events, discovered_objects, #discovered_tasks, #finalized_event, finalized_object, #finalized_task, #inserted, #replaced

Methods included from Transactions::PlanUpdates

#finalized_event, finalized_object, #finalized_task

Methods included from Propagation::RemoveDelayedOnFinalized

#finalized_event

Methods included from Log::PlanHooks

#added_transaction, #discarded, #discovered_events, #discovered_tasks, #finalized_event, #finalized_task, #garbage, #inserted, #removed_transaction, #replaced

Methods included from EventGenerator::FinalizedEventHook

#finalized_event

Methods inherited from BasicObject

#add_sibling_for, #distribute?, distribute?, #finalized?, #forget_peer, #has_sibling_on?, #initialize_copy, local_only, #read_write?, #remotely_useful?, #remove_sibling_for, #self_owned?, #sibling_of, #sibling_on, #subscribe, #subscribed?, #update_on?, #updated?, #updated_by?, #updated_peers

Methods included from Log::BasicObjectHooks

#added_owner, #removed_owner

Constructor Details

#initialize(plan, options = {}) ⇒ Transaction

Create a new distributed transaction based on the given plan. The transaction sole owner is the local plan manager, which is also the owner of the edition token.



43
44
45
46
47
48
49
50
51
# File 'lib/roby/distributed/transaction.rb', line 43

def initialize(plan, options = {})
		@owners  = [Distributed]
		@editor  = true

		@token_lock = Mutex.new
		@token_lock_signal = ConditionVariable.new

		super
end

Instance Attribute Details

#edition_reloopObject (readonly)

True if one of the editors request that the token is passed to them once more. The transaction can be committed only when all peers did not request that.

See #release



239
240
241
# File 'lib/roby/distributed/transaction.rb', line 239

def edition_reloop
  @edition_reloop
end

#ownersObject (readonly)

Returns the value of attribute owners.



36
37
38
# File 'lib/roby/distributed/transaction.rb', line 36

def owners
  @owners
end

#token_lockObject (readonly)

Returns the value of attribute token_lock.



37
38
39
# File 'lib/roby/distributed/transaction.rb', line 37

def token_lock
  @token_lock
end

#token_lock_signalObject (readonly)

Returns the value of attribute token_lock_signal.



37
38
39
# File 'lib/roby/distributed/transaction.rb', line 37

def token_lock_signal
  @token_lock_signal
end

Instance Method Details

#abandoned_commit(error) ⇒ Object

Hook called when the transaction commit has been abandoned because a owner refused it. reason is the value returned by this peer.



210
211
212
213
# File 'lib/roby/distributed/transaction.rb', line 210

def abandoned_commit(error)
		Distributed.debug { "abandoned commit of #{self} because of #{error}" }
		super if defined? super 
end

#commit_transaction(synchro = true) ⇒ Object

Commits the transaction. This method can only be called by the first editor of the transaction, once all owners have requested no additional modifications.

Distributed commits are done in two steps, to make sure that all owners agree to actually perform it. First, the PeerServer#transaction_prepare_commit message is sent, which can return either nil or an error object.

If all peers return nil, the actual commit is performed by sending the PeerServer#transaction_commit message. Otherwise, the commit is abandonned by sending the PeerServer#transaction_abandon_commit message to the transaction owners.



171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
# File 'lib/roby/distributed/transaction.rb', line 171

def commit_transaction(synchro = true)
		if !self_owned?
 raise OwnershipError, "cannot commit a transaction which is not owned locally. #{self} is owned by #{owners.to_a}"
		elsif synchro
 if !editor?
			raise NotEditor, "not editor of this transaction"
 elsif !first_editor?
			raise NotEditor, "transactions are committed by their first editor"
 elsif edition_reloop
			raise NotReady, "transaction still needs editing"
 end
		end

		if synchro
 result = call_owners(:transaction_prepare_commit, self)
 error = result.find_all { |_, returned| returned }
 if !error.empty?
			call_owners(:transaction_abandon_commit, self, error)
			return false
 else
			call_owners(:transaction_commit, self)
			return true
 end
		else
 all_objects = known_tasks.dup
 proxy_objects.each_key { |o| all_objects << o }
 Distributed.update(self) do
			Distributed.update_all(all_objects) do
  super() { yield if block_given? }
			end
 end
		end

		self
end

#copy_object_relations(object, proxy) ⇒ Object

:nodoc:



90
91
92
93
94
95
96
97
98
# File 'lib/roby/distributed/transaction.rb', line 90

def copy_object_relations(object, proxy) # :nodoc:
		# If the transaction is being updated, it means that we are
		# discovering the new transaction. In that case, no need to
		# discover the plan relations since our peer will send us all
		# transaction relations
		unless Distributed.updating?(self)
 super
		end
end

#discard_transaction(synchro = true) ⇒ Object

Discards the transaction. Unlike #commit_transaction, this can be called by any of the owners.



220
221
222
223
224
225
226
227
228
229
230
# File 'lib/roby/distributed/transaction.rb', line 220

def discard_transaction(synchro = true) # :nodoc:
		unless Distributed.owns?(self)
 raise OwnershipError, "cannot discard a transaction which is not owned locally. #{self} is owned by #{owners}"
		end

		if synchro
 call_siblings(:transaction_discard, self)
		else super()
		end
		self
end

#discover(objects) ⇒ Object

:nodoc:



136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# File 'lib/roby/distributed/transaction.rb', line 136

def discover(objects) # :nodoc:
		if objects
 events, tasks = partition_event_task(objects)
 for object in (events || []) + (tasks || [])
			unless Distributed.updating?(object) || 
  Distributed.owns?(object) || 
  (object.owners - owners).empty?

  raise OwnershipError, "#{object} is not owned by #{owners.to_a} (#{object.owners.to_a})"
			end
 end
 super(events) if events
 super(tasks) if tasks
		else
 super
		end
end

#do_wrap(base_object, create) ⇒ Object

:nodoc:



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/roby/distributed/transaction.rb', line 53

def do_wrap(base_object, create) # :nodoc:
		# It is allowed to add objects in a transaction only if
		#   * the object is not distribuable. It means that we are
		#     annotating *locally* remote tasks (like it is done for
		#     ConnectionTask for instance).
		#   * the object is owned by the transaction owners
		if create && (base_object.distribute? && !(base_object.owners - owners).empty?)
 raise OwnershipError, "plan owners #{owners} do not own #{base_object}: #{base_object.owners}"
		end

		temporarily_subscribed = !base_object.updated?
		if temporarily_subscribed
 peer = base_object.owners.first
 base_object = peer.subscribe(base_object)
		end

		if object = super
 object.extend DistributedObject
 if !Distributed.updating?(self) && object.root_object? && base_object.distribute?
			# The new proxy has been sent to remote hosts since it
			# has been discovered in the transaction. Nonetheless,
			# we don't want to return from #wrap until we know its
			# sibling. Add a synchro point to wait for that
     updated_peers.each do |peer|
         peer.synchro_point
     end
 end
		end

		object

ensure
		if temporarily_subscribed
 peer.unsubscribe(base_object)
		end
end

#droby_dump(dest) ⇒ Object

Returns a representation of self which can be used to reference it in our communication with dest.



377
378
379
380
381
382
383
384
385
# File 'lib/roby/distributed/transaction.rb', line 377

def droby_dump(dest) # :nodoc:
		if remote_siblings.has_key?(dest)
 remote_id
		else
 DRoby.new(remote_siblings.droby_dump(dest), owners.droby_dump(dest), 
    plan.droby_dump(dest), 
    options.droby_dump(dest))
		end
end

#edit(reloop = false) ⇒ Object

Waits for the edition token. If a block is given, it is called when the token is achieved, and releases the token when the blocks returns.



271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
# File 'lib/roby/distributed/transaction.rb', line 271

def edit(reloop = false)
		if Thread.current[:control_mutex_locked]
 raise "cannot call #edit with the control mutex taken !"
		end

		token_lock.synchronize do
 if !editor? # not the current editor
			token_lock_signal.wait(token_lock)
 end
		end

		if block_given?
 begin
			yield
 ensure
			release(reloop)
 end
		end
end

#edit!(reloop) ⇒ Object



260
261
262
263
264
265
266
# File 'lib/roby/distributed/transaction.rb', line 260

def edit!(reloop)
		token_lock.synchronize do
 @editor = true
 @edition_reloop = reloop
 token_lock_signal.broadcast
		end
end

#first_editor?Boolean

True if this plan manager is the first editor, i.e. the plan manager whose responsibility is to manage the edition protocol.

Returns:

  • (Boolean)


243
244
245
# File 'lib/roby/distributed/transaction.rb', line 243

def first_editor?
		owners.first == Distributed
end

#next_editorObject

Returns the peer which is after this plan manager in the edition order. The edition token will be sent to this peer by #release



248
249
250
251
252
253
254
255
256
257
258
# File 'lib/roby/distributed/transaction.rb', line 248

def next_editor
		if owners.last == Distributed
 return owners.first
		end

		owners.each_cons(2) do |first, second|
 if first == Distributed
			return second
 end
		end
end

#prepare_remove_owner(peer) ⇒ Object

Checks that peer can be removed from the list of owners



101
102
103
104
105
106
107
108
109
# File 'lib/roby/distributed/transaction.rb', line 101

def prepare_remove_owner(peer)
		known_tasks.each do |t|
 t = t.__getobj__ if t.respond_to?(:__getobj__)
 if peer.owns?(t) && t.distribute?
			raise OwnershipError, "#{peer} still owns tasks in the transaction (#{t})"
 end
		end
		nil
end

#propose(peer = nil, &block) ⇒ Object

Announces the transaction on peer or, if peer is nil, to all owners who don’t know about it yet. This operation is asynchronous, so the block, if given, will be called for each remote peer which has processed the message.

See Peer#transaction_propose



117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# File 'lib/roby/distributed/transaction.rb', line 117

def propose(peer = nil, &block)
		if !self_owned?
 raise OwnershipError, "cannot propose a transaction we don't own"
		end

		if peer
 peer.transaction_propose(self, &block)
		else
 (owners - remote_siblings.keys).each do |peer|
			if peer != Roby::Distributed
  Distributed.debug "proposing #{self} to #{peer}"
  propose(peer) do
				yield(peer)
  end
			end
 end
		end
end

#release(give_back = false) ⇒ Object

Releases the edition token, giving it to the next owner. If give_back is true, the local plan manager announces that it expects the token to be given back to it once more. The commit is allowed only when all peers have released the edition token without requesting it once more.

It sends the #transaction_give_token to the peer returned by #next_editor.

Raised NotEditor if the local plan manager is not the current transaction editor.



302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
# File 'lib/roby/distributed/transaction.rb', line 302

def release(give_back = false)
		token_lock.synchronize do
 if !editor?
			raise NotEditor, "not editor"
 else
			reloop = if first_editor?
    give_back
else
    edition_reloop || give_back
end

			return if owners.size == 1
			@editor = false
			next_editor.transaction_give_token(self, reloop)
			true
 end
		end
end