Module: PPipe::Controller

Includes:
Methods
Included in:
PPipe
Defined in:
lib/parallelpipes.rb

Constant Summary collapse

CON =

_GHAkxsadf0a0s98

:controller_switch

Instance Attribute Summary collapse

Attributes included from Methods

#is_root, #mpn, #redirect, #thread_safe, #tp_required, #tt_required

Attributes included from Log

#verbosity

Instance Method Summary collapse

Methods included from Methods

#assigned?, #die, #exit, #finish, #fork, #gets, #i_recv, #i_send, #kill_all, #kill_pipe, #pids, #puts, #read_all, #set_up_pipes, #t_recv, #tid, #user_end, #w_recv, #w_send, #wait, #wait_till_assigned, #waitall

Methods included from Log

clean_up, io=, #log, log_file, log_file=

Methods included from Reader

#configure_reader, #get_line, #read_pipe

Instance Attribute Details

#controllerObject (readonly)

the pipe number of the controller (nil if there is no controller)



1869
1870
1871
# File 'lib/parallelpipes.rb', line 1869

def controller
  @controller
end

Instance Method Details

#auto_load_save(resource_name, file_name) ⇒ Object

Raises:



2126
2127
2128
2129
2130
2131
2132
2133
2134
2135
2136
# File 'lib/parallelpipes.rb', line 2126

def auto_load_save(resource_name, file_name)
	ArgumentError.check([:resource_name, resource_name, [Symbol, Integer, Fixnum]])
	log 'fpv', :auto_load_save
	raise NoController unless @controller
	raise DeadParallelPipe unless @alive
	i_send(:auto_load_save, [resource_name, file_name], tp: @controller)
	reply = w_recv(:auto_load_save_ + resource_name, fp: @controller)
	raise ControllerError.new("if auto_load_save is called it must be before any access to a shared resource (#{resource_name})") if reply == :already_accessed
	raise ControllerError.new("another thread or process has already called auto_load_save for #{resource_name}") if reply == :already_opened
	log 'fpvc', :auto_load_save
end

#controller_alive?Boolean

Is there a controller?

Returns:

  • (Boolean)


2184
2185
2186
2187
# File 'lib/parallelpipes.rb', line 2184

def controller_alive?
	check_messages
	return @controller ? true : false
end

#controller_refresh=(value) ⇒ Object

Change the controller refresh time. controller_refresh is the delay between the controller dealing with requests. A shorter refresh time means the controller runs faster, but consumes more CPU (typically a controller consumes about 1-6% of CPU). Default is 0.001. There is no point having it smaller than 0.0001 - there is no improvement in performance.

Benchmarks (2GHz AMD 64 (32bit OS))

Time taken to lock and unlock a mutex twice

controller_refresh : time taken

  • 0.1: 0.313996553421021

  • 0.01: 0.132316589355469

  • 0.001: 0.0158429145812988

  • 0.0001: 0.00638794898986816

  • 1.0e-05: 0.0312449932098389

Footnote:

Consuming 1-6% of CPU seems like a lot. However, PPipe is aiming to bring the multi-core, multi-process world to Ruby. Most new computers have at least two CPU cores nowadays, many have more and the number will rise in the future. 1-6% of 1 processor is not much when the computer has 4, 8 or 16 processors.

Having said which, if someone finds a better way of doing what the controller does, please share it!

And finally, if this overhead is unacceptable, PPipe run without a Controller (see Methods#set_up_pipes) is still a powerful parallelization tool.

Raises:



1932
1933
1934
1935
1936
# File 'lib/parallelpipes.rb', line 1932

def controller_refresh=(value)
	raise NoController unless @controller
	raise DeadParallelPipe unless @alive
	i_send(:set_controller_refresh, value, tp: @controller, evaluate: true)
end

#controller_woken?(wait_till_woken = false) ⇒ Boolean

Returns true if the controller is awake, false if it is asleep. Currently hangs if the controller has not been told to sleep.

Experimental

Returns:

  • (Boolean)

Raises:



2162
2163
2164
2165
2166
2167
2168
2169
2170
2171
2172
2173
2174
2175
2176
2177
2178
2179
2180
# File 'lib/parallelpipes.rb', line 2162

def controller_woken?(wait_till_woken=false)
	#blocks unless controller has been put to sleep. Returns true immediately if controller is asleep. Returns false immediately if controller has been put to sleep and woken
	raise NoController unless @controller
	raise DeadParallelPipe unless @alive
	check_messages
	while @messages[tid][:controller_sleep_switch][@controller].values[0].size > 2
		2.times{@messages[tid][:controller_sleep_switch][@controller].values[0].shift}
	end
	unless @controller_asleep
		w_recv(:controller_sleep_switch, fp: @controller)
		@controller_asleep = true
	else
		if wait_till_woken
			@controller_asleep = w_recv(:controller_sleep_switch, fp: @controller)
		else
			@controller_asleep = !t_recv(:controller_sleep_switch, fp: @controller)
		end
	end
end

#get_shared_resource(resource_name) ⇒ Object

Get a shared resource from the controller. No other process or thread can access the resource until this process has returned it.

resource_name must be a Symbol or Integer

A shared resource can be any object where

eval(object.inspect) == object

If the resource has not been accessed before it is initialized to nil.

ppipe = PPipe.new(5, true) savings = ppipe.get_shared_resource(:savings)

Raises:



2041
2042
2043
2044
2045
2046
2047
2048
2049
2050
2051
2052
2053
2054
2055
2056
2057
2058
2059
2060
# File 'lib/parallelpipes.rb', line 2041

def get_shared_resource(resource_name)
	ArgumentError.check([:resource_name, resource_name, [Symbol, Integer, Fixnum]])

	log 'fpv', :get_shared_resource
	@shared_resource_mutexes[resource_name] ||= Mutex.new
	@shared_resource_mutexes[resource_name].lock
	check_messages
	raise NoController unless @controller
	raise DeadParallelPipe unless @alive
	@old_resource_controllers ||= {}
	@old_resource_controllers[resource_name] = @controller

# 		check_for_exit_signal
	i_send(CON,  [resource_name, :fetch], tp: @controller)
# 		$stderr.puts 'request sent'
	ans = w_recv(resource_name, fp: @controller)
# 		$stderr.puts 'got answer'
	log 'fpvc', :get_shared_resource
	return ans[0]
end

#lock(mutex_name) ⇒ Object

Lock a mutex named mutex_name. If the mutex did not exist before, create it. No other process or thread can lock this mutex until it has been unlocked.

mutex_name must be a symbol or an integer.

Note, every different mutex_name corresponds to a different mutex. Thus, it is easy to create vast numbers of mutexes. However, there is a memory and processor overhead associated with each mutex. Although this is small, creating very large numbers of mutexes is not a good idea.

Mutexes cannot be destroyed.

Raises:



1958
1959
1960
1961
1962
1963
1964
1965
1966
1967
1968
1969
1970
1971
1972
1973
# File 'lib/parallelpipes.rb', line 1958

def lock(mutex_name)
	ArgumentError.check([:mutex_name, mutex_name, [Symbol, Integer, Fixnum]])
	log 'fpv', :lock
# 		$stderr.puts "#@mpn thinks controller is #@controller"
	@thread_mutexes[mutex_name] ||= Mutex.new
	@thread_mutexes[mutex_name].lock
	check_messages
	raise NoController unless @controller
	raise DeadParallelPipe unless @alive
# 		check_for_exit_signal
	@old_controllers ||= {}
	@old_controllers[mutex_name] = @controller
	i_send(CON,  [mutex_name, :lock], tp: @controller)
	w_recv(mutex_name, fp: @controller)
	log 'fpvc', :lock
end

#mutex_locked?(mutex_name) ⇒ Boolean

Is the mutex called mutex_name locked? Returns [pipe_no, thread_id] of the locking thread and process if locked, nil if it is not locked.

Returns:

  • (Boolean)

Raises:



2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
# File 'lib/parallelpipes.rb', line 2014

def mutex_locked?(mutex_name)
	ArgumentError.check([:mutex_name, mutex_name, [Symbol, Integer, Fixnum]])
	log 'fpv', :locked?
	check_messages
	raise NoController unless @controller
	raise DeadParallelPipe unless @alive
	i_send(CON,  [mutex_name, :mutex_locked?], tp: @controller)
	ans = w_recv(mutex_name, fp: @controller)[0]
	log 'fpvc', :mutex_locked?
	return ans
end

#put_controller_to_sleep(sleep_time, wait_for_waking) ⇒ Object

If the controller will not be need for a while, put it to sleep for sleep_time seconds. If wait_for_waking is false, wake up automatically. If it is a number, check for a wake up call every wait_for_waiting seconds.

Experimental.

Raises:



2142
2143
2144
2145
2146
# File 'lib/parallelpipes.rb', line 2142

def put_controller_to_sleep(sleep_time, wait_for_waking)
	raise NoController unless @controller
	raise DeadParallelPipe unless @alive
	w_send(:sleep, [sleep_time, wait_for_waking], tp: @controller)
end

#resource_locked?(resource_name) ⇒ Boolean

Is the shared resource called resource_name locked? Returns [pipe_no, thread_id] of the locking thread and process if locked, nil if it is not locked.

Returns:

  • (Boolean)

Raises:



2105
2106
2107
2108
2109
2110
2111
2112
2113
2114
2115
# File 'lib/parallelpipes.rb', line 2105

def resource_locked?(resource_name)
	ArgumentError.check([:resource_name, resource_name, [Symbol, Integer, Fixnum]])
	log 'fpv', :locked?
	check_messages
	raise NoController unless @controller
	raise DeadParallelPipe unless @alive
	i_send(CON,  [resource_name, :resource_locked?], tp: @controller)
	ans = w_recv(resource_name, fp: @controller)[0]
	log 'fpvc', :resource_locked?
	return ans
end

#return_shared_resource(resource_name, resource) ⇒ Object

Return a shared resource.

If the process and thread calling return_shared_resource did not previously call get_shared_resource, this call will hang forever.

savings += 20.03 ppipe.return_shared_resource(:savings, savings)

Raises:



2069
2070
2071
2072
2073
2074
2075
2076
2077
2078
2079
2080
2081
2082
# File 'lib/parallelpipes.rb', line 2069

def return_shared_resource(resource_name, resource)
	ArgumentError.check([:resource_name, resource_name, [Symbol, Integer, Fixnum]])
	log 'fpv', :return_shared_resource

# 		check_messages
	raise NoController unless @old_resource_controllers[resource_name]
	raise DeadParallelPipe unless @alive
# 		check_for_exit_signal
	i_send(CON,  [resource_name, resource], tp: @old_resource_controllers[resource_name])
	w_recv(resource_name, fp: @old_resource_controllers[resource_name])
	@old_resource_controllers.delete(resource_name)
	@shared_resource_mutexes[resource_name].unlock
	log 'fpvc', :return_shared_resource
end

#shared_resource(resource_name, &block) ⇒ Object

Fetch a shared resource, edit it and then return it. The resource is passed as the parameter to the block.

ppipe.shared_resource(:savings) do |savings| savings += 102.96 # … some other calculations savings end

NB The shared resource to be returned must be the last line of the block



2094
2095
2096
2097
2098
2099
2100
2101
# File 'lib/parallelpipes.rb', line 2094

def shared_resource(resource_name, &block)
	ArgumentError.check([:resource_name, resource_name, [Symbol, Integer, Fixnum]])
	log 'fpv', :shared_resource
	resource = get_shared_resource(resource_name)
	resource = yield(resource)
	return_shared_resource(resource_name, resource)
	log 'fpvc', :shared_resource
end

#start_controller(controller_refresh = 0.001) ⇒ Object

Start a controller process. Raises an error if there already is one. controller_refresh is the delay between the controller dealing with requests. A shorter refresh time means the controller runs faster, but consumes more CPU (typically a controller consumes about 1-6% of CPU).

Raises:



1873
1874
1875
1876
1877
1878
1879
1880
1881
1882
1883
1884
1885
1886
1887
1888
1889
1890
1891
1892
1893
1894
# File 'lib/parallelpipes.rb', line 1873

def start_controller(controller_refresh=0.001)
	raise DeadParallelPipe unless @alive
	raise ControllerError.new("This parallel pipe already has a controller") if @controller
	parent = [mpn, tid]
	@controller = fork do
		log 'v4', 'Started Controller...'
		@controller_refresh = controller_refresh
		@messages[tid] ||= {}
		@messages[tid][CON] = {}
		@controller_status = {}
		@resource_status = {}
		@resources = {}
		@resource_file_names = {}
# 			@resource_accessed = {}
		log 'v5', 'commencing controller...'
		i_send(:controller_started, true, tp: parent[0], tt: parent[1])
		run_controller
		log 'v5', 'controller exiting...'
	end
	w_recv(:controller_started, fp: @controller)
# 		broadcast(:set_controller, @controller, {evaluate: true})
end

#stop_controllerObject

Stop the controller. The controller sends out an exit warning to all processes. No new lock, synchronize, get_shared_resource or shared_resource resource commands can be issued once that warning has been received. However, the controller will not exit until all existing locks on mutexes have been unlocked and all shared_resources have been returned.

Raises:



1898
1899
1900
1901
1902
1903
1904
1905
1906
1907
1908
# File 'lib/parallelpipes.rb', line 1898

def stop_controller
	raise NoController unless @controller
	raise DeadParallelPipe unless @alive
	controller = @controller
	w_send(:exit, true, tp: @controller)
	w_recv(:exited, fp: controller)
	Process.wait(@pids[controller])
	@pids.delete(controller)
# 		$stderr.puts '@controller is', @controller.inspect
	@controller = nil
end

#synchronize(mutex_name, &block) ⇒ Object

Lock the mutex, call the block and then unlock the mutex. See Controller#lock.

ppipe.synchronize(:sentence)‘a fragmented sen’; sleep rand; print “tence\n”

Is identical to:

ppipe.lock(:sentence) print ‘a fragmented sen’; sleep rand; print “tence\n” ppipe.unlock(:sentence)

Raises:



2002
2003
2004
2005
2006
2007
2008
2009
2010
# File 'lib/parallelpipes.rb', line 2002

def synchronize(mutex_name, &block)
	ArgumentError.check([:mutex_name, mutex_name, [Symbol, Integer, Fixnum]])
	log 'fpv', :synchronize
	raise DeadParallelPipe unless @alive
	lock(mutex_name)
	yield
	unlock(mutex_name)
	log 'fpvc', :synchronize
end

#unlock(mutex_name) ⇒ Object

Unlock a mutex. See Controller#lock

If the process and thread calling unlock did not previously lock the mutex, this call will hang forever.

Raises:



1979
1980
1981
1982
1983
1984
1985
1986
1987
1988
1989
1990
# File 'lib/parallelpipes.rb', line 1979

def unlock(mutex_name)
	ArgumentError.check([:mutex_name, mutex_name, [Symbol, Integer, Fixnum]])
	log 'fpv', :unlock
# 		check_messages
	raise NoController.new("Controller dead, or a lock call has not been issued with the name #{mutex_name} from this process") unless @old_controllers[mutex_name]
	raise DeadParallelPipe unless @alive
	i_send(CON,  [mutex_name, :unlock], tp: @old_controllers[mutex_name])
	w_recv(mutex_name, fp: @old_controllers[mutex_name])
	@old_controllers.delete(mutex_name)
	@thread_mutexes[mutex_name].unlock
	log 'fpvc', :unlock
end

#wake_controllerObject

Wake up the controller

Experimental

Raises:



2152
2153
2154
2155
2156
# File 'lib/parallelpipes.rb', line 2152

def wake_controller
	raise NoController unless @controller
	raise DeadParallelPipe unless @alive
	w_send(:wake, true, tp: @controller)
end