Class: ThreadSafe::AtomicReferenceCacheBackend

Inherits:
Object
  • Object
show all
Extended by:
Util::Volatile
Defined in:
lib/thread_safe/atomic_reference_cache_backend.rb

Overview

A Ruby port of the Doug Lea’s jsr166e.ConcurrentHashMapV8 class version 1.59 available in public domain. Original source code available here: gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/jsr166e/ConcurrentHashMapV8.java?revision=1.59

The Ruby port skips out the TreeBin (red-black trees for use in bins whose size exceeds a threshold).

A hash table supporting full concurrency of retrievals and high expected concurrency for updates. However, even though all operations are thread-safe, retrieval operations do not entail locking, and there is not any support for locking the entire table in a way that prevents all access.

Retrieval operations generally do not block, so may overlap with update operations. Retrievals reflect the results of the most recently completed update operations holding upon their onset. (More formally, an update operation for a given key bears a happens-before relation with any (non nil) retrieval for that key reporting the updated value.) For aggregate operations such as clear(), concurrent retrievals may reflect insertion or removal of only some entries. Similarly, the each_pair iterator yields elements reflecting the state of the hash table at some point at or since the start of the each_pair. Bear in mind that the results of aggregate status methods including size() and empty?} are typically useful only when a map is not undergoing concurrent updates in other threads. Otherwise the results of these methods reflect transient states that may be adequate for monitoring or estimation purposes, but not for program control.

The table is dynamically expanded when there are too many collisions (i.e., keys that have distinct hash codes but fall into the same slot modulo the table size), with the expected average effect of maintaining roughly two bins per mapping (corresponding to a 0.75 load factor threshold for resizing). There may be much variance around this average as mappings are added and removed, but overall, this maintains a commonly accepted time/space tradeoff for hash tables. However, resizing this or any other kind of hash table may be a relatively slow operation. When possible, it is a good idea to provide a size estimate as an optional :initial_capacity initializer argument. An additional optional :load_factor constructor argument provides a further means of customizing initial table capacity by specifying the table density to be used in calculating the amount of space to allocate for the given number of elements. Note that using many keys with exactly the same hash is a sure way to slow down performance of any hash table.

Design overview

The primary design goal of this hash table is to maintain concurrent readability (typically method [], but also iteration and related methods) while minimizing update contention. Secondary goals are to keep space consumption about the same or better than plain Hash, and to support high initial insertion rates on an empty table by many threads.

Each key-value mapping is held in a Node. The validation-based approach explained below leads to a lot of code sprawl because retry-control precludes factoring into smaller methods.

The table is lazily initialized to a power-of-two size upon the first insertion. Each bin in the table normally contains a list of Nodes (most often, the list has only zero or one Node). Table accesses require volatile/atomic reads, writes, and CASes. The lists of nodes within bins are always accurately traversable under volatile reads, so long as lookups check hash code and non-nullness of value before checking key equality.

We use the top two bits of Node hash fields for control purposes – they are available anyway because of addressing constraints. As explained further below, these top bits are used as follows:

00 - Normal
01 - Locked
11 - Locked and may have a thread waiting for lock
10 - +Node+ is a forwarding node

The lower 28 bits of each Node‘s hash field contain a the key’s hash code, except for forwarding nodes, for which the lower bits are zero (and so always have hash field == MOVED).

Insertion (via []= or its variants) of the first node in an empty bin is performed by just CASing it to the bin. This is by far the most common case for put operations under most key/hash distributions. Other update operations (insert, delete, and replace) require locks. We do not want to waste the space required to associate a distinct lock object with each bin, so instead use the first node of a bin list itself as a lock. Blocking support for these locks relies Util::CheapLockable. However, we also need a try_lock construction, so we overlay these by using bits of the Node hash field for lock control (see above), and so normally use builtin monitors only for blocking and signalling using cheap_wait/cheap_broadcast constructions. See Node#try_await_lock+.

Using the first node of a list as a lock does not by itself suffice though: When a node is locked, any update must first validate that it is still the first node after locking it, and retry if not. Because new nodes are always appended to lists, once a node is first in a bin, it remains first until deleted or the bin becomes invalidated (upon resizing). However, operations that only conditionally update may inspect nodes until the point of update. This is a converse of sorts to the lazy locking technique described by Herlihy & Shavit.

The main disadvantage of per-bin locks is that other update operations on other nodes in a bin list protected by the same lock can stall, for example when user eql? or mapping functions take a long time. However, statistically, under random hash codes, this is not a common problem. Ideally, the frequency of nodes in bins follows a Poisson distribution (en.wikipedia.org/wiki/Poisson_distribution) with a parameter of about 0.5 on average, given the resizing threshold of 0.75, although with a large variance because of resizing granularity. Ignoring variance, the expected occurrences of list size k are (exp(-0.5) * pow(0.5, k) / factorial(k)). The first values are:

0:    0.60653066
1:    0.30326533
2:    0.07581633
3:    0.01263606
4:    0.00157952
5:    0.00015795
6:    0.00001316
7:    0.00000094
8:    0.00000006
more: less than 1 in ten million

Lock contention probability for two threads accessing distinct elements is roughly 1 / (8 * #elements) under random hashes.

The table is resized when occupancy exceeds a percentage threshold (nominally, 0.75, but see below). Only a single thread performs the resize (using field size_control, to arrange exclusion), but the table otherwise remains usable for reads and updates. Resizing proceeds by transferring bins, one by one, from the table to the next table. Because we are using power-of-two expansion, the elements from each bin must either stay at same index, or move with a power of two offset. We eliminate unnecessary node creation by catching cases where old nodes can be reused because their next fields won’t change. On average, only about one-sixth of them need cloning when a table doubles. The nodes they replace will be garbage collectable as soon as they are no longer referenced by any reader thread that may be in the midst of concurrently traversing table. Upon transfer, the old table bin contains only a special forwarding node (with hash field MOVED) that contains the next table as its key. On encountering a forwarding node, access and update operations restart, using the new table.

Each bin transfer requires its bin lock. However, unlike other cases, a transfer can skip a bin if it fails to acquire its lock, and revisit it later. Method rebuild maintains a buffer of TRANSFER_BUFFER_SIZE bins that have been skipped because of failure to acquire a lock, and blocks only if none are available (i.e., only very rarely). The transfer operation must also ensure that all accessible bins in both the old and new table are usable by any traversal. When there are no lock acquisition failures, this is arranged simply by proceeding from the last bin (+table.size - 1+) up towards the first. Upon seeing a forwarding node, traversals arrange to move to the new table without revisiting nodes. However, when any node is skipped during a transfer, all earlier table bins may have become visible, so are initialized with a reverse-forwarding node back to the old table until the new ones are established. (This sometimes requires transiently locking a forwarding node, which is possible under the above encoding.) These more expensive mechanics trigger only when necessary.

The traversal scheme also applies to partial traversals of ranges of bins (via an alternate Traverser constructor) to support partitioned aggregate operations. Also, read-only operations give up if ever forwarded to a null table, which provides support for shutdown-style clearing, which is also not currently implemented.

Lazy table initialization minimizes footprint until first use.

The element count is maintained using a ThreadSafe::Util::Adder, which avoids contention on updates but can encounter cache thrashing if read too frequently during concurrent access. To avoid reading so often, resizing is attempted either when a bin lock is contended, or upon adding to a bin already holding two or more nodes (checked before adding in the x_if_absent methods, after adding in others). Under uniform hash distributions, the probability of this occurring at threshold is around 13%, meaning that only about 1 in 8 puts check threshold (and after resizing, many fewer do so). But this approximation has high variance for small table sizes, so we check on any collision for sizes <= 64. The bulk putAll operation further reduces contention by only committing count updates upon these size checks.

Defined Under Namespace

Classes: Node, Table

Constant Summary collapse

MOVED =

shorthands

Node::MOVED
LOCKED =
Node::LOCKED
WAITING =
Node::WAITING
HASH_BITS =
Node::HASH_BITS
NOW_RESIZING =
-1
DEFAULT_CAPACITY =
16
MAX_CAPACITY =
Util::MAX_INT
TRANSFER_BUFFER_SIZE =

The buffer size for skipped bins during transfers. The value is arbitrary but should be large enough to avoid most locking stalls during resizes.

32

Instance Method Summary collapse

Methods included from Util::Volatile

attr_volatile

Constructor Details

#initialize(options = nil) ⇒ AtomicReferenceCacheBackend

Returns a new instance of AtomicReferenceCacheBackend.



370
371
372
373
374
375
# File 'lib/thread_safe/atomic_reference_cache_backend.rb', line 370

def initialize(options = nil)
  super()
  @counter = Util::Adder.new
  initial_capacity  = options && options[:initial_capacity] || DEFAULT_CAPACITY
  self.size_control = (capacity = table_size_for(initial_capacity)) > MAX_CAPACITY ? MAX_CAPACITY : capacity
end

Instance Method Details

#[](key) ⇒ Object



395
396
397
# File 'lib/thread_safe/atomic_reference_cache_backend.rb', line 395

def [](key)
  get_or_default(key)
end

#[]=(key, value) ⇒ Object



403
404
405
406
# File 'lib/thread_safe/atomic_reference_cache_backend.rb', line 403

def []=(key, value)
  get_and_set(key, value)
  value
end

#clearObject

Implementation for clear. Steps through each bin, removing all nodes.



542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
# File 'lib/thread_safe/atomic_reference_cache_backend.rb', line 542

def clear
  return self unless current_table = table
  current_table_size = current_table.size
  deleted_count = i = 0
  while i < current_table_size
    if !(node = current_table.volatile_get(i))
      i += 1
    elsif (node_hash = node.hash) == MOVED
      current_table      = node.key
      current_table_size = current_table.size
    elsif Node.locked_hash?(node_hash)
      decrement_size(deleted_count) # opportunistically update count
      deleted_count = 0
      node.try_await_lock(current_table, i)
    else
      current_table.try_lock_via_hash(i, node, node_hash) do
        begin
          deleted_count += 1 if NULL != node.value # recheck under lock
          node.value = nil
        end while node = node.next
        current_table.volatile_set(i, nil)
        i += 1
      end
    end
  end
  decrement_size(deleted_count)
  self
end

#compute(key) ⇒ Object



443
444
445
446
447
448
449
450
451
# File 'lib/thread_safe/atomic_reference_cache_backend.rb', line 443

def compute(key)
  internal_compute(key) do |old_value|
    if (new_value = yield(NULL == old_value ? nil : old_value)).nil?
      NULL
    else
      new_value
    end
  end
end

#compute_if_absent(key) ⇒ Object



408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
# File 'lib/thread_safe/atomic_reference_cache_backend.rb', line 408

def compute_if_absent(key)
  hash          = key_hash(key)
  current_table = table || initialize_table
  while true
    if !(node = current_table.volatile_get(i = current_table.hash_to_index(hash)))
      succeeded, new_value = current_table.try_to_cas_in_computed(i, hash, key) { yield }
      if succeeded
        increment_size
        return new_value
      end
    elsif (node_hash = node.hash) == MOVED
      current_table = node.key
    elsif NULL != (current_value = find_value_in_node_list(node, key, hash, node_hash & HASH_BITS))
      return current_value
    elsif Node.locked_hash?(node_hash)
      try_await_lock(current_table, i, node)
    else
      succeeded, value = attempt_internal_compute_if_absent(key, hash, current_table, i, node, node_hash) { yield }
      return value if succeeded
    end
  end
end

#compute_if_present(key) ⇒ Object



431
432
433
434
435
436
437
438
439
440
441
# File 'lib/thread_safe/atomic_reference_cache_backend.rb', line 431

def compute_if_present(key)
  new_value = nil
  internal_replace(key) do |old_value|
    if (new_value = yield(NULL == old_value ? nil : old_value)).nil?
      NULL
    else
      new_value
    end
  end
  new_value
end

#delete(key) ⇒ Object



493
494
495
# File 'lib/thread_safe/atomic_reference_cache_backend.rb', line 493

def delete(key)
  replace_if_exists(key, NULL)
end

#delete_pair(key, value) ⇒ Object



497
498
499
500
501
502
503
504
# File 'lib/thread_safe/atomic_reference_cache_backend.rb', line 497

def delete_pair(key, value)
  result = internal_replace(key, value) { NULL }
  if result && NULL != result
    !!result
  else
    false
  end
end

#each_pairObject



506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
# File 'lib/thread_safe/atomic_reference_cache_backend.rb', line 506

def each_pair
  return self unless current_table = table
  current_table_size = base_size = current_table.size
  i = base_index = 0
  while base_index < base_size
    if node = current_table.volatile_get(i)
      if node.hash == MOVED
        current_table      = node.key
        current_table_size = current_table.size
      else
        begin
          if NULL != (value = node.value) # skip deleted or special nodes
            yield node.key, value
          end
        end while node = node.next
      end
    end

    if (i_with_base = i + base_size) < current_table_size
      i = i_with_base # visit upper slots if present
    else
      i = base_index += 1
    end
  end
  self
end

#empty?Boolean

Returns:

  • (Boolean)


537
538
539
# File 'lib/thread_safe/atomic_reference_cache_backend.rb', line 537

def empty?
  size == 0
end

#get_and_set(key, value) ⇒ Object

internalPut in the original CHMV8



473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
# File 'lib/thread_safe/atomic_reference_cache_backend.rb', line 473

def get_and_set(key, value) # internalPut in the original CHMV8
  hash          = key_hash(key)
  current_table = table || initialize_table
  while true
    if !(node = current_table.volatile_get(i = current_table.hash_to_index(hash)))
      if current_table.cas_new_node(i, hash, key, value)
        increment_size
        break
      end
    elsif (node_hash = node.hash) == MOVED
      current_table = node.key
    elsif Node.locked_hash?(node_hash)
      try_await_lock(current_table, i, node)
    else
      succeeded, old_value = attempt_get_and_set(key, value, hash, current_table, i, node, node_hash)
      break old_value if succeeded
    end
  end
end

#get_or_default(key, else_value = nil) ⇒ Object



377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
# File 'lib/thread_safe/atomic_reference_cache_backend.rb', line 377

def get_or_default(key, else_value = nil)
  hash          = key_hash(key)
  current_table = table
  while current_table
    node = current_table.volatile_get_by_hash(hash)
    current_table =
      while node
        if (node_hash = node.hash) == MOVED
          break node.key
        elsif (node_hash & HASH_BITS) == hash && node.key?(key) && NULL != (value = node.value)
          return value
        end
        node = node.next
      end
  end
  else_value
end

#key?(key) ⇒ Boolean

Returns:

  • (Boolean)


399
400
401
# File 'lib/thread_safe/atomic_reference_cache_backend.rb', line 399

def key?(key)
  get_or_default(key, NULL) != NULL
end

#merge_pair(key, value) ⇒ Object



453
454
455
456
457
458
459
460
461
# File 'lib/thread_safe/atomic_reference_cache_backend.rb', line 453

def merge_pair(key, value)
  internal_compute(key) do |old_value|
    if NULL == old_value || !(value = yield(old_value)).nil?
      value
    else
      NULL
    end
  end
end

#replace_if_exists(key, new_value) ⇒ Object



467
468
469
470
471
# File 'lib/thread_safe/atomic_reference_cache_backend.rb', line 467

def replace_if_exists(key, new_value)
  if (result = internal_replace(key) { new_value }) && NULL != result
    result
  end
end

#replace_pair(key, old_value, new_value) ⇒ Object



463
464
465
# File 'lib/thread_safe/atomic_reference_cache_backend.rb', line 463

def replace_pair(key, old_value, new_value)
  NULL != internal_replace(key, old_value) { new_value }
end

#sizeObject



533
534
535
# File 'lib/thread_safe/atomic_reference_cache_backend.rb', line 533

def size
  (sum = @counter.sum) < 0 ? 0 : sum # ignore transient negative values
end