Module: PPipe::Controller
Constant Summary collapse
- CON =
_GHAkxsadf0a0s98
:controller_switch
Instance Attribute Summary collapse
-
#controller ⇒ Object
readonly
the pipe number of the controller (nil if there is no controller).
Attributes included from Methods
#is_root, #mpn, #redirect, #thread_safe, #tp_required, #tt_required
Attributes included from Log
Instance Method Summary collapse
- #auto_load_save(resource_name, file_name) ⇒ Object
-
#controller_alive? ⇒ Boolean
Is there a controller?.
-
#controller_refresh=(value) ⇒ Object
Change the controller refresh time.
-
#controller_woken?(wait_till_woken = false) ⇒ Boolean
Returns true if the controller is awake, false if it is asleep.
-
#get_shared_resource(resource_name) ⇒ Object
Get a shared resource from the controller.
-
#lock(mutex_name) ⇒ Object
Lock a mutex named mutex_name.
-
#mutex_locked?(mutex_name) ⇒ Boolean
Is the mutex called mutex_name locked? Returns [pipe_no, thread_id] of the locking thread and process if locked, nil if it is not locked.
-
#put_controller_to_sleep(sleep_time, wait_for_waking) ⇒ Object
If the controller will not be need for a while, put it to sleep for sleep_time seconds.
-
#resource_locked?(resource_name) ⇒ Boolean
Is the shared resource called resource_name locked? Returns [pipe_no, thread_id] of the locking thread and process if locked, nil if it is not locked.
-
#return_shared_resource(resource_name, resource) ⇒ Object
Return a shared resource.
-
#shared_resource(resource_name, &block) ⇒ Object
Fetch a shared resource, edit it and then return it.
-
#start_controller(controller_refresh = 0.001) ⇒ Object
Start a controller process.
-
#stop_controller ⇒ Object
Stop the controller.
-
#synchronize(mutex_name, &block) ⇒ Object
Lock the mutex, call the block and then unlock the mutex.
-
#unlock(mutex_name) ⇒ Object
Unlock a mutex.
-
#wake_controller ⇒ Object
Wake up the controller.
Methods included from Methods
#assigned?, #die, #exit, #finish, #fork, #gets, #i_recv, #i_send, #kill_all, #kill_pipe, #pids, #puts, #read_all, #set_up_pipes, #t_recv, #tid, #user_end, #w_recv, #w_send, #wait, #wait_till_assigned, #waitall
Methods included from Log
clean_up, io=, #log, log_file, log_file=
Methods included from Reader
#configure_reader, #get_line, #read_pipe
Instance Attribute Details
#controller ⇒ Object (readonly)
the pipe number of the controller (nil if there is no controller)
1869 1870 1871 |
# File 'lib/parallelpipes.rb', line 1869 def controller @controller end |
Instance Method Details
#auto_load_save(resource_name, file_name) ⇒ Object
2126 2127 2128 2129 2130 2131 2132 2133 2134 2135 2136 |
# File 'lib/parallelpipes.rb', line 2126 def auto_load_save(resource_name, file_name) ArgumentError.check([:resource_name, resource_name, [Symbol, Integer, Fixnum]]) log 'fpv', :auto_load_save raise NoController unless @controller raise DeadParallelPipe unless @alive i_send(:auto_load_save, [resource_name, file_name], tp: @controller) reply = w_recv(:auto_load_save_ + resource_name, fp: @controller) raise ControllerError.new("if auto_load_save is called it must be before any access to a shared resource (#{resource_name})") if reply == :already_accessed raise ControllerError.new("another thread or process has already called auto_load_save for #{resource_name}") if reply == :already_opened log 'fpvc', :auto_load_save end |
#controller_alive? ⇒ Boolean
Is there a controller?
2184 2185 2186 2187 |
# File 'lib/parallelpipes.rb', line 2184 def controller_alive? return @controller ? true : false end |
#controller_refresh=(value) ⇒ Object
Change the controller refresh time. controller_refresh is the delay between the controller dealing with requests. A shorter refresh time means the controller runs faster, but consumes more CPU (typically a controller consumes about 1-6% of CPU). Default is 0.001. There is no point having it smaller than 0.0001 - there is no improvement in performance.
Benchmarks (2GHz AMD 64 (32bit OS))
Time taken to lock and unlock a mutex twice
controller_refresh : time taken
-
0.1: 0.313996553421021
-
0.01: 0.132316589355469
-
0.001: 0.0158429145812988
-
0.0001: 0.00638794898986816
-
1.0e-05: 0.0312449932098389
Footnote:
Consuming 1-6% of CPU seems like a lot. However, PPipe is aiming to bring the multi-core, multi-process world to Ruby. Most new computers have at least two CPU cores nowadays, many have more and the number will rise in the future. 1-6% of 1 processor is not much when the computer has 4, 8 or 16 processors.
Having said which, if someone finds a better way of doing what the controller does, please share it!
And finally, if this overhead is unacceptable, PPipe run without a Controller (see Methods#set_up_pipes) is still a powerful parallelization tool.
1932 1933 1934 1935 1936 |
# File 'lib/parallelpipes.rb', line 1932 def controller_refresh=(value) raise NoController unless @controller raise DeadParallelPipe unless @alive i_send(:set_controller_refresh, value, tp: @controller, evaluate: true) end |
#controller_woken?(wait_till_woken = false) ⇒ Boolean
Returns true if the controller is awake, false if it is asleep. Currently hangs if the controller has not been told to sleep.
Experimental
2162 2163 2164 2165 2166 2167 2168 2169 2170 2171 2172 2173 2174 2175 2176 2177 2178 2179 2180 |
# File 'lib/parallelpipes.rb', line 2162 def controller_woken?(wait_till_woken=false) #blocks unless controller has been put to sleep. Returns true immediately if controller is asleep. Returns false immediately if controller has been put to sleep and woken raise NoController unless @controller raise DeadParallelPipe unless @alive while @messages[tid][:controller_sleep_switch][@controller].values[0].size > 2 2.times{@messages[tid][:controller_sleep_switch][@controller].values[0].shift} end unless @controller_asleep w_recv(:controller_sleep_switch, fp: @controller) @controller_asleep = true else if wait_till_woken @controller_asleep = w_recv(:controller_sleep_switch, fp: @controller) else @controller_asleep = !t_recv(:controller_sleep_switch, fp: @controller) end end end |
#get_shared_resource(resource_name) ⇒ Object
Get a shared resource from the controller. No other process or thread can access the resource until this process has returned it.
resource_name must be a Symbol or Integer
A shared resource can be any object where
eval(object.inspect) == object
If the resource has not been accessed before it is initialized to nil.
ppipe = PPipe.new(5, true) savings = ppipe.get_shared_resource(:savings)
2041 2042 2043 2044 2045 2046 2047 2048 2049 2050 2051 2052 2053 2054 2055 2056 2057 2058 2059 2060 |
# File 'lib/parallelpipes.rb', line 2041 def get_shared_resource(resource_name) ArgumentError.check([:resource_name, resource_name, [Symbol, Integer, Fixnum]]) log 'fpv', :get_shared_resource @shared_resource_mutexes[resource_name] ||= Mutex.new @shared_resource_mutexes[resource_name].lock raise NoController unless @controller raise DeadParallelPipe unless @alive @old_resource_controllers ||= {} @old_resource_controllers[resource_name] = @controller # check_for_exit_signal i_send(CON, [resource_name, :fetch], tp: @controller) # $stderr.puts 'request sent' ans = w_recv(resource_name, fp: @controller) # $stderr.puts 'got answer' log 'fpvc', :get_shared_resource return ans[0] end |
#lock(mutex_name) ⇒ Object
Lock a mutex named mutex_name. If the mutex did not exist before, create it. No other process or thread can lock this mutex until it has been unlocked.
mutex_name must be a symbol or an integer.
Note, every different mutex_name corresponds to a different mutex. Thus, it is easy to create vast numbers of mutexes. However, there is a memory and processor overhead associated with each mutex. Although this is small, creating very large numbers of mutexes is not a good idea.
Mutexes cannot be destroyed.
1958 1959 1960 1961 1962 1963 1964 1965 1966 1967 1968 1969 1970 1971 1972 1973 |
# File 'lib/parallelpipes.rb', line 1958 def lock(mutex_name) ArgumentError.check([:mutex_name, mutex_name, [Symbol, Integer, Fixnum]]) log 'fpv', :lock # $stderr.puts "#@mpn thinks controller is #@controller" @thread_mutexes[mutex_name] ||= Mutex.new @thread_mutexes[mutex_name].lock raise NoController unless @controller raise DeadParallelPipe unless @alive # check_for_exit_signal @old_controllers ||= {} @old_controllers[mutex_name] = @controller i_send(CON, [mutex_name, :lock], tp: @controller) w_recv(mutex_name, fp: @controller) log 'fpvc', :lock end |
#mutex_locked?(mutex_name) ⇒ Boolean
Is the mutex called mutex_name locked? Returns [pipe_no, thread_id] of the locking thread and process if locked, nil if it is not locked.
2014 2015 2016 2017 2018 2019 2020 2021 2022 2023 2024 |
# File 'lib/parallelpipes.rb', line 2014 def mutex_locked?(mutex_name) ArgumentError.check([:mutex_name, mutex_name, [Symbol, Integer, Fixnum]]) log 'fpv', :locked? raise NoController unless @controller raise DeadParallelPipe unless @alive i_send(CON, [mutex_name, :mutex_locked?], tp: @controller) ans = w_recv(mutex_name, fp: @controller)[0] log 'fpvc', :mutex_locked? return ans end |
#put_controller_to_sleep(sleep_time, wait_for_waking) ⇒ Object
If the controller will not be need for a while, put it to sleep for sleep_time seconds. If wait_for_waking is false, wake up automatically. If it is a number, check for a wake up call every wait_for_waiting seconds.
Experimental.
2142 2143 2144 2145 2146 |
# File 'lib/parallelpipes.rb', line 2142 def put_controller_to_sleep(sleep_time, wait_for_waking) raise NoController unless @controller raise DeadParallelPipe unless @alive w_send(:sleep, [sleep_time, wait_for_waking], tp: @controller) end |
#resource_locked?(resource_name) ⇒ Boolean
Is the shared resource called resource_name locked? Returns [pipe_no, thread_id] of the locking thread and process if locked, nil if it is not locked.
2105 2106 2107 2108 2109 2110 2111 2112 2113 2114 2115 |
# File 'lib/parallelpipes.rb', line 2105 def resource_locked?(resource_name) ArgumentError.check([:resource_name, resource_name, [Symbol, Integer, Fixnum]]) log 'fpv', :locked? raise NoController unless @controller raise DeadParallelPipe unless @alive i_send(CON, [resource_name, :resource_locked?], tp: @controller) ans = w_recv(resource_name, fp: @controller)[0] log 'fpvc', :resource_locked? return ans end |
#return_shared_resource(resource_name, resource) ⇒ Object
Return a shared resource.
If the process and thread calling return_shared_resource did not previously call get_shared_resource, this call will hang forever.
savings += 20.03 ppipe.return_shared_resource(:savings, savings)
2069 2070 2071 2072 2073 2074 2075 2076 2077 2078 2079 2080 2081 2082 |
# File 'lib/parallelpipes.rb', line 2069 def return_shared_resource(resource_name, resource) ArgumentError.check([:resource_name, resource_name, [Symbol, Integer, Fixnum]]) log 'fpv', :return_shared_resource # check_messages raise NoController unless @old_resource_controllers[resource_name] raise DeadParallelPipe unless @alive # check_for_exit_signal i_send(CON, [resource_name, resource], tp: @old_resource_controllers[resource_name]) w_recv(resource_name, fp: @old_resource_controllers[resource_name]) @old_resource_controllers.delete(resource_name) @shared_resource_mutexes[resource_name].unlock log 'fpvc', :return_shared_resource end |
#shared_resource(resource_name, &block) ⇒ Object
Fetch a shared resource, edit it and then return it. The resource is passed as the parameter to the block.
ppipe.shared_resource(:savings) do |savings| savings += 102.96 # … some other calculations savings end
NB The shared resource to be returned must be the last line of the block
2094 2095 2096 2097 2098 2099 2100 2101 |
# File 'lib/parallelpipes.rb', line 2094 def shared_resource(resource_name, &block) ArgumentError.check([:resource_name, resource_name, [Symbol, Integer, Fixnum]]) log 'fpv', :shared_resource resource = get_shared_resource(resource_name) resource = yield(resource) return_shared_resource(resource_name, resource) log 'fpvc', :shared_resource end |
#start_controller(controller_refresh = 0.001) ⇒ Object
Start a controller process. Raises an error if there already is one. controller_refresh is the delay between the controller dealing with requests. A shorter refresh time means the controller runs faster, but consumes more CPU (typically a controller consumes about 1-6% of CPU).
1873 1874 1875 1876 1877 1878 1879 1880 1881 1882 1883 1884 1885 1886 1887 1888 1889 1890 1891 1892 1893 1894 |
# File 'lib/parallelpipes.rb', line 1873 def start_controller(controller_refresh=0.001) raise DeadParallelPipe unless @alive raise ControllerError.new("This parallel pipe already has a controller") if @controller parent = [mpn, tid] @controller = fork do log 'v4', 'Started Controller...' @controller_refresh = controller_refresh @messages[tid] ||= {} @messages[tid][CON] = {} @controller_status = {} @resource_status = {} @resources = {} @resource_file_names = {} # @resource_accessed = {} log 'v5', 'commencing controller...' i_send(:controller_started, true, tp: parent[0], tt: parent[1]) run_controller log 'v5', 'controller exiting...' end w_recv(:controller_started, fp: @controller) # broadcast(:set_controller, @controller, {evaluate: true}) end |
#stop_controller ⇒ Object
Stop the controller. The controller sends out an exit warning to all processes. No new lock, synchronize, get_shared_resource or shared_resource resource commands can be issued once that warning has been received. However, the controller will not exit until all existing locks on mutexes have been unlocked and all shared_resources have been returned.
1898 1899 1900 1901 1902 1903 1904 1905 1906 1907 1908 |
# File 'lib/parallelpipes.rb', line 1898 def stop_controller raise NoController unless @controller raise DeadParallelPipe unless @alive controller = @controller w_send(:exit, true, tp: @controller) w_recv(:exited, fp: controller) Process.wait(@pids[controller]) @pids.delete(controller) # $stderr.puts '@controller is', @controller.inspect @controller = nil end |
#synchronize(mutex_name, &block) ⇒ Object
Lock the mutex, call the block and then unlock the mutex. See Controller#lock.
ppipe.synchronize(:sentence)‘a fragmented sen’; sleep rand; print “tence\n”
Is identical to:
ppipe.lock(:sentence) print ‘a fragmented sen’; sleep rand; print “tence\n” ppipe.unlock(:sentence)
2002 2003 2004 2005 2006 2007 2008 2009 2010 |
# File 'lib/parallelpipes.rb', line 2002 def synchronize(mutex_name, &block) ArgumentError.check([:mutex_name, mutex_name, [Symbol, Integer, Fixnum]]) log 'fpv', :synchronize raise DeadParallelPipe unless @alive lock(mutex_name) yield unlock(mutex_name) log 'fpvc', :synchronize end |
#unlock(mutex_name) ⇒ Object
Unlock a mutex. See Controller#lock
If the process and thread calling unlock did not previously lock the mutex, this call will hang forever.
1979 1980 1981 1982 1983 1984 1985 1986 1987 1988 1989 1990 |
# File 'lib/parallelpipes.rb', line 1979 def unlock(mutex_name) ArgumentError.check([:mutex_name, mutex_name, [Symbol, Integer, Fixnum]]) log 'fpv', :unlock # check_messages raise NoController.new("Controller dead, or a lock call has not been issued with the name #{mutex_name} from this process") unless @old_controllers[mutex_name] raise DeadParallelPipe unless @alive i_send(CON, [mutex_name, :unlock], tp: @old_controllers[mutex_name]) w_recv(mutex_name, fp: @old_controllers[mutex_name]) @old_controllers.delete(mutex_name) @thread_mutexes[mutex_name].unlock log 'fpvc', :unlock end |
#wake_controller ⇒ Object
Wake up the controller
Experimental
2152 2153 2154 2155 2156 |
# File 'lib/parallelpipes.rb', line 2152 def wake_controller raise NoController unless @controller raise DeadParallelPipe unless @alive w_send(:wake, true, tp: @controller) end |