Class: Fluent::Counter::MutexHash
- Inherits:
-
Object
- Object
- Fluent::Counter::MutexHash
- Defined in:
- lib/fluent/counter/mutex_hash.rb
Instance Method Summary collapse
-
#initialize(data_store) ⇒ MutexHash
constructor
A new instance of MutexHash.
- #start ⇒ Object
- #stop ⇒ Object
- #synchronize(*keys) ⇒ Object
- #synchronize_keys(*keys) ⇒ Object
Constructor Details
#initialize(data_store) ⇒ MutexHash
Returns a new instance of MutexHash.
22 23 24 25 26 27 28 |
# File 'lib/fluent/counter/mutex_hash.rb', line 22 def initialize(data_store) @mutex = Mutex.new @data_store = data_store @mutex_hash = {} @thread = nil @cleanup_thread = CleanupThread.new(@data_store, @mutex_hash, @mutex) end |
Instance Method Details
#start ⇒ Object
30 31 32 33 |
# File 'lib/fluent/counter/mutex_hash.rb', line 30 def start @data_store.start @cleanup_thread.start end |
#stop ⇒ Object
35 36 37 38 |
# File 'lib/fluent/counter/mutex_hash.rb', line 35 def stop @data_store.stop @cleanup_thread.stop end |
#synchronize(*keys) ⇒ Object
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/fluent/counter/mutex_hash.rb', line 40 def synchronize(*keys) return if keys.empty? locks = {} loop do @mutex.synchronize do keys.each do |key| mutex = @mutex_hash[key] unless mutex v = Mutex.new @mutex_hash[key] = v mutex = v end if mutex.try_lock locks[key] = mutex else locks.values.each(&:unlock) locks = {} # flush locked keys break end end end next if locks.empty? # failed to lock all keys locks.each do |(k, v)| yield @data_store, k v.unlock end break end end |
#synchronize_keys(*keys) ⇒ Object
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
# File 'lib/fluent/counter/mutex_hash.rb', line 74 def synchronize_keys(*keys) return if keys.empty? keys = keys.dup while key = keys.shift @mutex.lock mutex = @mutex_hash[key] unless mutex v = Mutex.new @mutex_hash[key] = v mutex = v end if mutex.try_lock @mutex.unlock yield @data_store, key mutex.unlock else # release global lock @mutex.unlock keys.push(key) # failed lock, retry this key end end end |