Class: Collective::Registry

Inherits:
Object
  • Object
show all
Defined in:
lib/collective/registry.rb

Overview

A registry knows how to lookup up and register workers.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name, storage) ⇒ Registry

Returns a new instance of Registry.



16
17
18
19
20
21
22
# File 'lib/collective/registry.rb', line 16

def initialize( name, storage )
  @name    = name    or raise
  @storage = storage or raise

  # type checking
  name.encoding
end

Instance Attribute Details

#nameObject (readonly)

Returns the value of attribute name.



13
14
15
# File 'lib/collective/registry.rb', line 13

def name
  @name
end

#storageObject (readonly)

Returns the value of attribute storage.



14
15
16
# File 'lib/collective/registry.rb', line 14

def storage
  @storage
end

Instance Method Details

#check_workers(policy, options = nil, &block) ⇒ Object

This method can be slow so it takes a block for incremental processing.

Parameters:

  • block

    takes entry, status in [ :live, :late, :hung, :dead ]

  • options (:all) (defaults to: nil)

    true to get keys across all hosts



75
76
77
78
79
80
81
# File 'lib/collective/registry.rb', line 75

def check_workers( policy, options = nil, &block )
  workers.each do |key|
    heartbeat = storage.get( status_key(key.to_s) ).to_i
    status    = heartbeat_status( policy, heartbeat )
    yield( key, status )
  end
end

#checked_workers(policy) ⇒ Object



63
64
65
66
67
68
69
# File 'lib/collective/registry.rb', line 63

def checked_workers( policy )
  groups = { live: [], late: [], hung: [], dead: [] }
  check_workers(policy) do |key, status|
    groups[status] << key
  end
  OpenStruct.new(groups)
end

#reconnect_after_forkObject



25
26
27
# File 'lib/collective/registry.rb', line 25

def reconnect_after_fork
  @storage.reconnect_after_fork
end

#register(key) ⇒ Object



30
31
32
33
34
# File 'lib/collective/registry.rb', line 30

def register( key )
  key = key.to_s
  storage.set_add( workers_key, key )
  storage.put( status_key(key), Time.now.to_i )
end

#unregister(key) ⇒ Object



44
45
46
47
48
# File 'lib/collective/registry.rb', line 44

def unregister( key )
  key = key.to_s
  storage.del( status_key(key) )
  storage.set_remove( workers_key, key )
end

#update(key) ⇒ Object



37
38
39
40
41
# File 'lib/collective/registry.rb', line 37

def update( key )
  key = key.to_s
  storage.set_add( workers_key, key ) if ! storage.set_member?( workers_key, key )
  storage.put( status_key(key), Time.now.to_i )
end

#workersObject

NOTICE this will include keys for workers on all hosts



56
57
58
59
60
# File 'lib/collective/registry.rb', line 56

def workers
  all = storage.set_get_all( workers_key )
  raise "Not a Set: #{workers_key} (#{all.class})" unless all.kind_of?(Array)
  all.map { |key_string| Collective::Key.parse(key_string) }
end