Class: Krakow::Distribution::Default

Inherits:
Krakow::Distribution show all
Defined in:
lib/krakow/distribution/default.rb

Overview

Default distribution implementation. This uses a round-robin approach for less than ideal states.

Instance Attribute Summary collapse

Attributes inherited from Krakow::Distribution

#flight_record, #ideal, #registry

Attributes included from Utils::Lazy::InstanceMethods

#arguments

Instance Method Summary collapse

Methods inherited from Krakow::Distribution

#add_connection, #backoff_interval, #backoff_interval?, #connection_lookup, #connections, #consumer, #consumer?, #failure, #in_flight_lookup, #initial_ready, #initialize, #max_in_flight, #max_in_flight?, #ready_for, #register_message, #registry_lookup, #remove_connection, #success, #unregister_message, #watch_dog_interval, #watch_dog_interval?

Methods included from Utils::Lazy::ClassMethods

#attribute, #attributes, #set_attributes

Methods included from Utils::Lazy::InstanceMethods

#initialize, #inspect, #to_s

Methods included from Utils::Lazy

included

Methods included from Utils::Logging

level=, #log

Constructor Details

This class inherits a constructor from Krakow::Distribution

Instance Attribute Details

#less_than_ideal_stackObject (readonly)

Returns the value of attribute less_than_ideal_stack.



9
10
11
# File 'lib/krakow/distribution/default.rb', line 9

def less_than_ideal_stack
  @less_than_ideal_stack
end

#watch_dogObject (readonly)

Returns the value of attribute watch_dog.



9
10
11
# File 'lib/krakow/distribution/default.rb', line 9

def watch_dog
  @watch_dog
end

Instance Method Details

#calculate_ready!(connection_identifier) ⇒ Integer?

Update connection ready count

Parameters:

  • connection_identifier (String)

Returns:

  • (Integer, nil)


99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# File 'lib/krakow/distribution/default.rb', line 99

def calculate_ready!(connection_identifier)
  begin
    registry_info = registry_lookup(connection_identifier)
    unless(less_than_ideal?)
      registry_info[:ready] = ideal - registry_info[:in_flight]
      if(registry_info[:ready] < 0 || registry_info[:backoff_until] > Time.now.to_i)
        registry_info[:ready] = 0
        registry_info[:backoff_timer].cancel if registry[:backoff_timer]
        registry_info[:backoff_timer] = after(registry_info[:backoff_until] - Time.now.to_i) do
          calculate_ready!(connection_identifier)
          set_ready_for(connection_lookup(connection_identifier)) unless less_than_ideal?
        end
      end
      registry_info[:ready]
    else
      registry_info[:ready] = 0
    end
  rescue Error::ConnectionFailure
    warn 'Failed connection encountered!'
  rescue Error::ConnectionUnavailable
    warn 'Unavailable connection encountered!'
  end
end

#force_unreadynil

Force a connection to give up RDY state so next in stack can receive

Returns:

  • (nil)


144
145
146
147
148
149
150
151
152
153
154
155
# File 'lib/krakow/distribution/default.rb', line 144

def force_unready
  debug 'Forcing a connection into an unready state due to less than ideal state'
  connection = rdy_connections.shuffle.first
  if(connection)
    debug "Stripping RDY state from connection: #{connection}"
    calculate_ready!(connection.identifier)
    set_ready_for(connection)
  else
    warn "Failed to locate available connection for RDY aquisition!"
  end
  nil
end

#less_than_ideal?TrueClass, FalseClass

Is ideal less than 1

Returns:

  • (TrueClass, FalseClass)


48
49
50
# File 'lib/krakow/distribution/default.rb', line 48

def less_than_ideal?
  ideal < 1
end

#less_than_ideal_ready!Krakow::Connection?

Find next connection to receive RDY count

Returns:



55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/krakow/distribution/default.rb', line 55

def less_than_ideal_ready!
  admit_defeat = false
  connection = nil
  until(connection || (admit_defeat && less_than_ideal_stack.empty?))
    if(less_than_ideal_stack.nil? || less_than_ideal_stack.empty?)
      @less_than_ideal_stack = waiting_connections
      admit_defeat = true
    end
    con = less_than_ideal_stack.pop
    if(con)
      unless(registry_lookup(con.identifier)[:backoff_until] > Time.now.to_i)
        connection = con
      end
    end
  end
  if(connection)
    registry_lookup(connection.identifier)[:ready] = 1
    connection
  end
end

#rdy_connectionsArray<Krakow::Connection>

All connections with RDY state

Returns:



135
136
137
138
139
# File 'lib/krakow/distribution/default.rb', line 135

def rdy_connections
  registry.find_all do |conn_id, info|
    info[:ready] > 0
  end.map{|conn_id, info| connection_lookup(conn_id) }.compact
end

#redistribute!Object

recalculate ‘ideal` and update RDY on connections



12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/krakow/distribution/default.rb', line 12

def redistribute!
  @ideal = registry.size < 1 ? 0 : max_in_flight / registry.size
  debug "Distribution calculated ideal: #{ideal}"
  if(less_than_ideal?)
    registry.each do |connection_id, reg_info|
      reg_info[:ready] = 0
    end
    max_in_flight.times do
      less_than_ideal_ready!
    end
    connections.each do |connection|
      set_ready_for(connection, :force)
    end
    watch_dog.cancel if watch_dog
    @watch_dog = every(watch_dog_interval) do
      force_unready
    end
  else
    if(watch_dog)
      watch_dog.cancel
      @watch_dog = nil
    end
    connections.each do |connection|
      current_ready = ready_for(connection.identifier)
      calculate_ready!(connection.identifier)
      unless(current_ready == ready_for(connection.identifier))
        debug "Redistribution ready setting update for connection #{connection}"
        set_ready_for(connection)
      end
    end
  end
end

#set_ready_for(connection, *args) ⇒ Krakow::FrameType::Error?

Adds extra functionality to provide round robin RDY setting when in less than ideal state

Parameters:

Returns:



82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/krakow/distribution/default.rb', line 82

def set_ready_for(connection, *args)
  super connection
  if(less_than_ideal? && !args.include?(:force))
    debug "RDY set ignored due to less than ideal state (con: #{connection})"
    con = less_than_ideal_ready!
    if(con)
      watch_dog.reset if watch_dog
      super con
    else
      warn 'Failed to set RDY state while less than ideal. Connection stack is empty!'
    end
  end
end

#waiting_connectionsArray<Krakow::Connection>

All connections without RDY state

Returns:



126
127
128
129
130
# File 'lib/krakow/distribution/default.rb', line 126

def waiting_connections
  registry.find_all do |conn_id, info|
    info[:ready] < 1 && info[:in_flight] < 1 && info[:backoff_until] < Time.now.to_i
  end.map{|conn_id, info| connection_lookup(conn_id) }.compact
end