Class: Archipelago::Tranny::Transaction
- Inherits:
-
Object
- Object
- Archipelago::Tranny::Transaction
- Includes:
- Current::Synchronized
- Defined in:
- lib/archipelago/tranny.rb
Overview
A transaction managed by the manager.
A transaction can have the following states:
-
:active - When it is first created.
-
This transaction can be commited or aborted.
-
-
:voting - When it has started the two-phase commit and the voting has begun.
-
This transaction can be aborted.
-
-
:commited - After everyone has voted and voted either :unchanged or :commit.
-
This transaction can not be changed.
-
-
:aborted - After someone has called abort! or voted :abort in a vote!
-
This transaction can not be changed.
-
Anyone that wants to join the transaction must implement the following methods:
-
abort!(transaction): Abort the provided transaction.
-
commit!(transaction): Commit the provided transaction.
-
prepare!(transaction): Prepare for commiting the provided transaction.
abort! and commit! should not return any values, but can raise exceptions if required.
prepare! must return either :abort, :commit or :unchanged, depending on what the member is prepared to do. :unchanged is only when the member has not changed state during the transaction, and means that it does not require any further notification on the progress of the transaction.
A member that has returned :commit on the prepare! must store the transaction proxy in a persistent manner to be able to connect to the manager and get a new copy of the transaction in case of communications failure.
A member that reconnects to a crashed transaction manager should not abort! the transaction if the transaction is still in :active state, since the transaction will have been aborted on commit! anyway (since the manager will not be able to prepare! the disconnected member after either the manager or member having crashed) if needed. If the disconnect was just a temporary networking problem, the transaction will continue as planned.
A member that reconnects to a crashed transaction manager where the transaction is in :voting state should just wait around and see if it gets prepare! called. In case of temporary network failure the transaction will continue as planned, otherwise it will abort! automatically.
A member that reconnects to a disconnected transaction manager where the transaction is in :commited state should just commit its state. Then it must notify the transaction using report_commited! so that the transaction can disappear gracefully.
A member that reconnects to a disconnected transaction manager that either doesnt know of the transaction or returns an :aborted transaction may safely abort the state change and forget about the transaction.
Instance Attribute Summary collapse
-
#manager ⇒ Object
Returns the value of attribute manager.
-
#proxy ⇒ Object
readonly
Returns the value of attribute proxy.
-
#state ⇒ Object
readonly
Returns the value of attribute state.
-
#transaction_id ⇒ Object
readonly
Returns the value of attribute transaction_id.
Class Method Summary collapse
Instance Method Summary collapse
-
#_dump(l) ⇒ Object
Special dump call to NOT dump our manager or proxy, since DRbObjects dont take kindly to being restored in an environment where they are are known to be invalid.
-
#abort! ⇒ Object
Abort the transaction, sending all participants the abort! message.
-
#commit! ⇒ Object
Commits the transaction, returning the new state (:aborted | :commited).
-
#initialize(options) ⇒ Transaction
constructor
Create a transaction managed by the provided
manager
. -
#join(member) ⇒ Object
Join a
member
to this transaction. -
#remove_us_we_are_redundant! ⇒ Object
Remove ourselves, we are redundant.
-
#report_commited!(member) ⇒ Object
Used by members that failed during commit.
-
#start_timeout_thread ⇒ Object
Starts the thread that will abort! us automatically after we have lived @timeout.
-
#store_us_for_future_reference! ⇒ Object
What it sounds like.
Methods included from Current::Synchronized
#lock_on, #mon_check_owner, #synchronize_on, #unlock_on
Constructor Details
#initialize(options) ⇒ Transaction
Create a transaction managed by the provided manager
.
Will have :manager as TransactionManager, and will timeout after :timeout seconds.
320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 |
# File 'lib/archipelago/tranny.rb', line 320 def initialize() super() # # A hash where members are keys and their state the values. # @members = {} @members.extend(Archipelago::Current::Synchronized) # # We are alive! # @state = :active # # We have a timeout! # @timeout = [:timeout] # # We are unique! # @transaction_id = "#{[:manager].service_id}:#{Time.new.to_f}:#{self.object_id}:#{rand(1 << 32)}" # # We have a birth time! # @created_at = Time.now # # We have a manager! # self.manager = [:manager] store_us_for_future_reference! start_timeout_thread end |
Instance Attribute Details
#manager ⇒ Object
Returns the value of attribute manager.
312 313 314 |
# File 'lib/archipelago/tranny.rb', line 312 def manager @manager end |
#proxy ⇒ Object (readonly)
Returns the value of attribute proxy.
312 313 314 |
# File 'lib/archipelago/tranny.rb', line 312 def proxy @proxy end |
#state ⇒ Object (readonly)
Returns the value of attribute state.
312 313 314 |
# File 'lib/archipelago/tranny.rb', line 312 def state @state end |
#transaction_id ⇒ Object (readonly)
Returns the value of attribute transaction_id.
312 313 314 |
# File 'lib/archipelago/tranny.rb', line 312 def transaction_id @transaction_id end |
Class Method Details
._load(s) ⇒ Object
386 387 388 389 390 391 392 393 394 395 |
# File 'lib/archipelago/tranny.rb', line 386 def self._load(s) members, state, timeout, transaction_id, created_at = Marshal.load(s) rval = self.allocate rval.instance_variable_set(:@members, members) rval.instance_variable_set(:@state, state) rval.instance_variable_set(:@timeout, timeout) rval.instance_variable_set(:@transaction_id, transaction_id) rval.instance_variable_set(:@created_at, created_at) rval end |
Instance Method Details
#_dump(l) ⇒ Object
Special dump call to NOT dump our manager or proxy, since DRbObjects dont take kindly to being restored in an environment where they are are known to be invalid.
376 377 378 379 380 381 382 383 384 |
# File 'lib/archipelago/tranny.rb', line 376 def _dump(l) return Marshal.dump([ @members, @state, @timeout, @transaction_id, @created_at ]) end |
#abort! ⇒ Object
Abort the transaction, sending all participants the abort! message.
445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 |
# File 'lib/archipelago/tranny.rb', line 445 def abort! synchronize do raise IllegalOperationException.new(:abort!, self) if [:commited, :aborted].include?(self.state) # # Set our state. # @state = :aborted store_us_for_future_reference! # # Abort all members. # threads = [] @members.clone.each do |member, state| raise RuntimeException.new("This is not supposed to be possible, but we are in abort! with member " + "#{member.inspect} in state #{state.inspect}") if state == :commited if state != :aborted threads << Thread.new do begin member.abort!(self.proxy) @members.synchronize do @members[member] = :aborted end rescue Exception => e @manager.log_error(e) # # We must not let the other members stop just # because one member failed. No more Mr Nice Guy! # end end end end # # Wait for all members to finish. # threads.each do |thread| thread.join end # # No use having aborted transactions lying about. # # NB: This means that disconnected members that cant # find their old transactions will have to presume they # have been aborted. # remove_us_we_are_redundant! end nil end |
#commit! ⇒ Object
Commits the transaction, returning the new state (:aborted | :commited)
503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 |
# File 'lib/archipelago/tranny.rb', line 503 def commit! synchronize do raise IllegalOperationException.new(:commit!, self) unless self.state == :active # # Vote for the outcome and act on it # case vote! when :abort abort! when :commit _commit! when :unchanged @state = :commited remove_us_we_are_redundant! end return @state end nil end |
#join(member) ⇒ Object
Join a member
to this transaction.
Will raise a JoinCountException if the given member has allready joined this transaction.
531 532 533 534 535 536 537 538 539 540 |
# File 'lib/archipelago/tranny.rb', line 531 def join(member) @members.synchronize do raise IllegalOperationException.new(:join, self) unless self.state == :active raise JoinCountException.new(member, self) if @members.include?(member) @members[member] = :active end store_us_for_future_reference! nil end |
#remove_us_we_are_redundant! ⇒ Object
Remove ourselves, we are redundant
424 425 426 427 |
# File 'lib/archipelago/tranny.rb', line 424 def remove_us_we_are_redundant! @manager.remove_transaction!(self) nil end |
#report_commited!(member) ⇒ Object
Used by members that failed during commit.
432 433 434 435 436 437 438 439 440 |
# File 'lib/archipelago/tranny.rb', line 432 def report_commited!(member) raise UnknownMemberException(member, self) unless @members.include?(member) raise IllegalOperationException(:report_commited!, self) unless self.state == :commited @members[member] = :commited remove_us_if_all_are_commited! nil end |
#start_timeout_thread ⇒ Object
Starts the thread that will abort! us automatically after we have lived @timeout
401 402 403 404 405 406 407 408 409 410 411 |
# File 'lib/archipelago/tranny.rb', line 401 def start_timeout_thread Thread.new do now = Time.now die_at = @created_at + @timeout if die_at > now sleep(die_at - now) end abort! end nil end |
#store_us_for_future_reference! ⇒ Object
What it sounds like.
416 417 418 419 |
# File 'lib/archipelago/tranny.rb', line 416 def store_us_for_future_reference! @manager.store_transaction!(self) nil end |