Class: SourceGroup

Inherits:
Object
  • Object
show all
Defined in:
lib/group_delegator/source_group.rb

Overview

This class is the container for the objects that will receive common method calls It also manages the concurrency model to be used when performing the method calls

Constant Summary collapse

IterativeBlock =

Built-in concurrency models for delegating methods execute forwarding in an iterative fashion

lambda{ |sources, m, *args, &block|
  all_resps = {}
  sources.each do |source|
    this_resp = {}
    begin
      #collect valid responses
      all_resps[:valid] ||= {}
      all_resps[:valid][source] = source.__send__(m, *args, &block)
    rescue NoMethodError
      #oops we have some invalid responses, collect those too
      all_resps[:invalid] ||= []
      all_resps[:invalid] << source 
    end
  end
  all_resps
}
ThreadedBlock =

If we like speed (with a dash of danger) we can thread the requests rather than iterate

lambda{ |sources, m, *args, &block|
  all_resps = {}
  threads = []
  sources.each do |source|
    threads << Thread.new(source) do |src|
      Thread.current[:src] = src
      begin
        Thread.current[:resp] = src.__send__(m, *args, &block)
      rescue
        Thread.current[:err] =  src
      end
    end
  end
  
  threads.each do |t|
    t.join
    src = t[:src]  #proxied object
    if t[:resp]
      #valid response
      all_resps[:valid] ||= {}
      all_resps[:valid][src] = t[:resp]
    elsif t[:err]
      #oops error
      all_resps[:invalid] ||= []
      all_resps[:invalid] << t[:err]
    else
      raise "source returned an invalid responseto its thread."\
           "Response thread: #{t} source: #{source.inspect}"
    end
  end
  all_resps
}
ThreadedFirstResponseBlock =

More speed (with more danger) we can use the first valid response (note the change in t.join) How to react to the first response, maybe fibers?

lambda{ |sources, m, *args, &block|
  t0 = Time.now
  first_resp = {}
  source_threads = []
  queue = Queue.new
  sources.each do |source|
    source_threads << Thread.new do
      begin
        queue << { source => source.__send__(m, *args, &block)}
      rescue
        Thread.current[:err] =  source #these errored out before a valid entry in queue
      end
    end
  end

  valid_response = nil
  #limit the time for responses
  check_queue = Thread.new do
    until valid_response do
      sleep 0.01 #don't consume all available resources on a silly event loop
      valid_response = queue.shift   #shift not pop in case more than one response in queue
    end
    Thread.current[:q_response] = valid_response
  end

  #Continue if all source_threads finish,  but if check_queue finishes first, continue regardless of source_threads status
  any_thread_running = true
  while any_thread_running do
    sleep 0.01
    any_src_thr_alive = source_threads.inject(false) {|alive, thr| alive || thr.status}
    any_thread_running = check_queue.status && any_src_thr_alive
  end
  
  t1 = Time.now
  
  if (t1-t0) < 0.1
    sleep 0.05  #give time fot things to stabilize
  end
  #puts "Response from queue: #{check_queue[:q_response].inspect}"
  

  invalid_resps = []
  source_threads.each do |t|
    invalid_resps << t[:err] if t[:err]
  end
  #returning source_threads so that the caller can join them if needed (i.e. ending in a know state)
  #invalid_responses only contains responsed from threads that completed prior to the first valid response
  first_resp = { :valid => valid_response, :invalid => invalid_resps, :threads => source_threads }
}

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(sources, concurrency_model = :iterative) ⇒ SourceGroup

Returns a new instance of SourceGroup.



114
115
116
117
# File 'lib/group_delegator/source_group.rb', line 114

def initialize(sources, concurrency_model = :iterative)
  @sources = sources
  @concurrency_model = concurrency_model
end

Instance Attribute Details

#concurrency_modelObject

:sources, :valid_response_trace, :invalid_response_list



6
7
8
# File 'lib/group_delegator/source_group.rb', line 6

def concurrency_model
  @concurrency_model
end

Instance Method Details

#forward(m, *args, &block) ⇒ Object



119
120
121
# File 'lib/group_delegator/source_group.rb', line 119

def forward(m, *args, &block)
  forward_custom(@concurrency_model, m, *args, &block)
end

#forward_custom(forward_method, m, *args, &block) ⇒ Object



123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/group_delegator/source_group.rb', line 123

def forward_custom(forward_method, m, *args, &block)
  forward_block = case forward_method
    when :iterative
      IterativeBlock
    when :threaded
      ThreadedBlock
    when :first_response
      ThreadedFirstResponseBlock
    when Proc
      forward_method
    else
      raise "Invalid parameter: #{forward_method.inspect}"
    end
    
  @valid_response_trace = {}
  @invalid_response_list = []
  sources = @sources
  if sources.size > 0
    resp = forward_block.call(sources, m, *args, &block)
    @valid_response_trace = resp[:valid]
  else
    raise "No sources assigned"
  end
  @valid_response_trace
end

#forward_first_resp(m, *args, &block) ⇒ Object



158
159
160
# File 'lib/group_delegator/source_group.rb', line 158

def forward_first_resp(m, *args, &block)
  forward_custom(:first_response, m, *args, &block)
end

#forward_iterative(m, *args, &block) ⇒ Object

just some sugar



150
151
152
# File 'lib/group_delegator/source_group.rb', line 150

def forward_iterative(m, *args, &block)
  forward_custom(:iterative, m, *args, & block)
end

#forward_threaded(m, *args, &block) ⇒ Object



154
155
156
# File 'lib/group_delegator/source_group.rb', line 154

def forward_threaded(m, *args, &block)
  forward_custom(:threaded, m, *args, & block)
end