Class: Collective::Registry
- Inherits:
-
Object
- Object
- Collective::Registry
- Defined in:
- lib/collective/registry.rb
Overview
A registry knows how to lookup up and register workers.
Instance Attribute Summary collapse
-
#name ⇒ Object
readonly
Returns the value of attribute name.
-
#storage ⇒ Object
readonly
Returns the value of attribute storage.
Instance Method Summary collapse
-
#check_workers(policy, options = nil, &block) ⇒ Object
This method can be slow so it takes a block for incremental processing.
- #checked_workers(policy) ⇒ Object
-
#initialize(name, storage) ⇒ Registry
constructor
A new instance of Registry.
- #reconnect_after_fork ⇒ Object
- #register(key) ⇒ Object
- #unregister(key) ⇒ Object
- #update(key) ⇒ Object
-
#workers ⇒ Object
NOTICE this will include keys for workers on all hosts.
Constructor Details
#initialize(name, storage) ⇒ Registry
Returns a new instance of Registry.
16 17 18 19 20 21 22 |
# File 'lib/collective/registry.rb', line 16 def initialize( name, storage ) @name = name or raise @storage = storage or raise # type checking name.encoding end |
Instance Attribute Details
#name ⇒ Object (readonly)
Returns the value of attribute name.
13 14 15 |
# File 'lib/collective/registry.rb', line 13 def name @name end |
#storage ⇒ Object (readonly)
Returns the value of attribute storage.
14 15 16 |
# File 'lib/collective/registry.rb', line 14 def storage @storage end |
Instance Method Details
#check_workers(policy, options = nil, &block) ⇒ Object
This method can be slow so it takes a block for incremental processing.
75 76 77 78 79 80 81 |
# File 'lib/collective/registry.rb', line 75 def check_workers( policy, = nil, &block ) workers.each do |key| heartbeat = storage.get( status_key(key.to_s) ).to_i status = heartbeat_status( policy, heartbeat ) yield( key, status ) end end |
#checked_workers(policy) ⇒ Object
63 64 65 66 67 68 69 |
# File 'lib/collective/registry.rb', line 63 def checked_workers( policy ) groups = { live: [], late: [], hung: [], dead: [] } check_workers(policy) do |key, status| groups[status] << key end OpenStruct.new(groups) end |
#reconnect_after_fork ⇒ Object
25 26 27 |
# File 'lib/collective/registry.rb', line 25 def reconnect_after_fork @storage.reconnect_after_fork end |
#register(key) ⇒ Object
30 31 32 33 34 |
# File 'lib/collective/registry.rb', line 30 def register( key ) key = key.to_s storage.set_add( workers_key, key ) storage.put( status_key(key), Time.now.to_i ) end |
#unregister(key) ⇒ Object
44 45 46 47 48 |
# File 'lib/collective/registry.rb', line 44 def unregister( key ) key = key.to_s storage.del( status_key(key) ) storage.set_remove( workers_key, key ) end |
#update(key) ⇒ Object
37 38 39 40 41 |
# File 'lib/collective/registry.rb', line 37 def update( key ) key = key.to_s storage.set_add( workers_key, key ) if ! storage.set_member?( workers_key, key ) storage.put( status_key(key), Time.now.to_i ) end |
#workers ⇒ Object
NOTICE this will include keys for workers on all hosts
56 57 58 59 60 |
# File 'lib/collective/registry.rb', line 56 def workers all = storage.set_get_all( workers_key ) raise "Not a Set: #{workers_key} (#{all.class})" unless all.kind_of?(Array) all.map { |key_string| Collective::Key.parse(key_string) } end |