Class: Archipelago::Treasure::Chest
- Inherits:
-
Object
- Object
- Archipelago::Treasure::Chest
- Includes:
- Disco::Publishable
- Defined in:
- lib/archipelago/treasure.rb
Overview
A possibly remote database that only returns proxies to its contents, and thus runs all methods on its contents itself.
Has support for optimistically locked distributed serializable transactions.
Instance Method Summary collapse
-
#[](key, transaction = nil) ⇒ Object
Return the contents of this chest using a given
key
andtransaction
. -
#[]=(key, p1, p2 = nil) ⇒ Object
Put something into this chest with a given
key
,value
andtransaction
. -
#abort!(transaction) ⇒ Object
Abort
transaction
in this Chest. -
#active_transactions ⇒ Object
The transactions active in this Chest.
-
#call_instance_method(key, method, transaction, *arguments, &block) ⇒ Object
Call an instance
method
on whatever this chest holds atkey
with anytransaction
andargs
. -
#commit!(transaction) ⇒ Object
Commits
transaction
in this Chest. -
#delete(key, transaction = nil) ⇒ Object
Delete the value of
key
withintransaction
. -
#each(callable) ⇒ Object
Will do
callable
.call(key, value) for each key-and-value pair in this Chest. -
#evaluate!(label, timestamp, data) ⇒ Object
Evaluate
data
if we have not already seenlabel
or if we have an earliertimestamp
than the one given. -
#initialize(options = {}) ⇒ Chest
constructor
Initialize a Chest.
-
#prepare!(transaction) ⇒ Object
Prepares
transaction
in this Chest.
Methods included from Disco::Publishable
#_dump, append_features, #initialize_publishable, #publish!, #service_id, #stop!, #valid?
Constructor Details
#initialize(options = {}) ⇒ Chest
Initialize a Chest
Will use a BerkeleyHashishProvider using treasure_chest.db in the same dir to get its hashes if not :persistence_provider is given.
Will try to recover crashed transaction every :transaction_recovery_interval seconds or TRANSACTION_RECOVERY_INTERVAL if none is given.
Will use Archipelago::Disco::Publishable by calling initialize_publishable with options
.
242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 |
# File 'lib/archipelago/treasure.rb', line 242 def initialize( = {}) # # The provider of happy magic persistent hashes of different kinds. # @persistence_provider = [:persistence_provider] || Archipelago::Hashish::BerkeleyHashishProvider.new(Pathname.new(File.(__FILE__)).parent.join("treasure_chest.db")) # # Use the given options to initialize the publishable # instance variables. # initialize_publishable() # # [transaction => [key => instance]] # To know what stuff is visible to a transaction. # @snapshot_by_transaction = {} @snapshot_by_transaction.extend(Archipelago::Current::Synchronized) # # [transaction => [key => when the key was read/updated/deleted] # To know if a transaction is ok to prepare and commit. # @timestamp_by_key_by_transaction = {} # # [transaction => [key => instance]] # To know what transactions were prepared but not # properly finished last run. # @crashed = Set.new initialize_seen_data # # The magical persistent map that defines how we actually # store our data. # @db = @persistence_provider.get_cached_hashish("db") initialize_prepared([:transaction_recovery_interval] || TRANSACTION_RECOVERY_INTERVAL) CHEST_BY_SERVICE_ID[self.service_id] = self end |
Instance Method Details
#[](key, transaction = nil) ⇒ Object
Return the contents of this chest using a given key
and transaction
.
334 335 336 337 338 339 340 341 342 343 344 345 |
# File 'lib/archipelago/treasure.rb', line 334 def [](key, transaction = nil) join!(transaction) instance = ensure_instance_with_transaction(key, transaction) return nil unless instance if Dubloon === instance return instance.join(transaction) else return Dubloon.new(key, self, transaction, self.service_id) end end |
#[]=(key, p1, p2 = nil) ⇒ Object
Put something into this chest with a given key
, value
and transaction
.
389 390 391 392 393 394 395 396 397 398 399 |
# File 'lib/archipelago/treasure.rb', line 389 def []=(key, p1, p2 = nil) if p2 value = p2 transaction = p1 else value = p1 transaction = nil end return set(key, value, transaction) end |
#abort!(transaction) ⇒ Object
Abort transaction
in this Chest.
416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 |
# File 'lib/archipelago/treasure.rb', line 416 def abort!(transaction) assert_transaction(transaction) snapshot = @snapshot_by_transaction[transaction] # # Make sure nobody can modify this transaction while we # are aborting it. # snapshot.synchronize do serialized_transaction = Marshal.dump(transaction) # # If this transaction was successfully prepared # if @snapshot_by_transaction_db.include?(serialized_transaction) # # Unlock the keys that are part of it. # snapshot.each do |key, value| @db.unlock_on(key) end # # And remove it from persistent storage. # @snapshot_by_transaction_db[serialized_transaction] = nil # # And remove its timestamps from persistent storage. # @timestamp_by_key_by_transaction_db[serialized_transaction] = nil end # # Finally delete it from the snapshots. # @snapshot_by_transaction.delete(transaction) # # And from the timestamps. # @timestamp_by_key_by_transaction.delete(transaction) end end |
#active_transactions ⇒ Object
The transactions active in this Chest.
290 291 292 |
# File 'lib/archipelago/treasure.rb', line 290 def active_transactions @snapshot_by_transaction.keys.clone end |
#call_instance_method(key, method, transaction, *arguments, &block) ⇒ Object
Call an instance method
on whatever this chest holds at key
with any transaction
and args
.
405 406 407 408 409 410 411 |
# File 'lib/archipelago/treasure.rb', line 405 def call_instance_method(key, method, transaction, *arguments, &block) if transaction return call_with_transaction(key, method, transaction, *arguments, &block) else return call_without_transaction(key, method, *arguments, &block) end end |
#commit!(transaction) ⇒ Object
Commits transaction
in this Chest.
NB: Transaction must be prepared before commit is called.
519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 |
# File 'lib/archipelago/treasure.rb', line 519 def commit!(transaction) assert_transaction(transaction) raise IllegalCommitException.new(self, transaction) unless @snapshot_by_transaction_db.include?(Marshal.dump(transaction)) snapshot = @snapshot_by_transaction[transaction] # # Make sure nobody can modify this transaction while we are # commiting it. # snapshot.synchronize do # # Copy each key and value from our private space to the real space # snapshot.each do |key, value| if value == :deleted @db.delete(key) else @db[key] = value end end # # Call abort! to clean up after the transaction. # abort!(transaction) end end |
#delete(key, transaction = nil) ⇒ Object
Delete the value of key
within transaction
.
350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 |
# File 'lib/archipelago/treasure.rb', line 350 def delete(key, transaction = nil) join!(transaction) rval = nil if transaction # # If we have a transaction we must note that it is deleted in a # separate space for that transaction. # snapshot = @snapshot_by_transaction[transaction] snapshot.synchronize do rval = snapshot[key] snapshot[key] = :deleted # # Make sure we remember when it was last changed according to our main db. # = @timestamp_by_key_by_transaction[transaction] [key] = @db.(key) unless .include?(key) end else # # Otherwise just ask our persistence provider to delete it. # @db.delete(key) end rval.freeze return rval end |
#each(callable) ⇒ Object
Will do callable
.call(key, value) for each key-and-value pair in this Chest.
NB: This is totaly thread-unsafe, only do this for management or rescue!
301 302 303 |
# File 'lib/archipelago/treasure.rb', line 301 def each(callable) @db.each(callable) end |
#evaluate!(label, timestamp, data) ⇒ Object
Evaluate data
if we have not already seen label
or if we have an earlier timestamp
than the one given.
308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 |
# File 'lib/archipelago/treasure.rb', line 308 def evaluate!(label, , data) serialized_label = Marshal.dump(label) go = false if @seen_data.include?(serialized_label) , last_data = Marshal.load(@seen_data[serialized_label]) go = true if > else go = true end if go begin Object.class_eval(data) @seen_data[serialized_label] = Marshal.dump([, data]) rescue Exception => e @seen_data[serialized_label] = Marshal.load([, nil]) puts e pp e.backtrace end end end |
#prepare!(transaction) ⇒ Object
Prepares transaction
in this Chest.
NB: This will cause any update of the data within this transaction to block until it is either aborted or commited!
464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 |
# File 'lib/archipelago/treasure.rb', line 464 def prepare!(transaction) assert_transaction(transaction) # # If we dont know about this transaction then it can't very well # affect us. # return :unchanged unless @snapshot_by_transaction.include?(transaction) snapshot = @snapshot_by_transaction[transaction] # # Make sure nobody can modify this transaction while we are # preparing it. # snapshot.synchronize do # # Remember what locks we acquire so that we can # unlock them in case of failure. # locks = [] = @timestamp_by_key_by_transaction[transaction] # # Acquire a lock on each key in the transaction # snapshot.each do |key, value| if @db.(key) == [key] @db.lock_on(key) locks << key else locks.each do |key| @db.unlock_on(key) end return :abort end end serialized_transaction = Marshal.dump(transaction) # # Dump its state to persistent storage. # @snapshot_by_transaction_db[serialized_transaction] = Marshal.dump(snapshot) # # And dump its timestamps to persistent storage # @timestamp_by_key_by_transaction_db[serialized_transaction] = Marshal.dump() return :prepared end end |