Class: Packet::Reactor
- Inherits:
-
Object
- Object
- Packet::Reactor
- Includes:
- Core
- Defined in:
- lib/packet/packet_master.rb
Instance Attribute Summary collapse
-
#fd_writers ⇒ Object
set_thread_pool_size(20).
-
#live_workers ⇒ Object
Returns the value of attribute live_workers.
-
#msg_reader ⇒ Object
set_thread_pool_size(20).
-
#msg_writers ⇒ Object
set_thread_pool_size(20).
-
#result_hash ⇒ Object
Returns the value of attribute result_hash.
Class Method Summary collapse
- .run {|master_reactor_instance| ... } ⇒ Object
-
.server_logger=(log_file_name) ⇒ Object
after_connection :provide_workers.
Instance Method Summary collapse
- #delete_worker(worker_options = {}) ⇒ Object
- #enable_nonblock(io) ⇒ Object
-
#fork_and_load(worker_klass, worker_options = { }) ⇒ Object
method should use worker_key if provided in options hash.
- #form_cmd_line(*args) ⇒ Object
- #handle_internal_messages(t_sock) ⇒ Object
- #load_workers ⇒ Object
-
#redirect_io(logfile_name) ⇒ Object
Free file descriptors and point them somewhere sensible STDOUT/STDERR should go to a logfile.
- #remove_worker(t_sock) ⇒ Object
-
#set_result_hash(hash) ⇒ Object
end of run method.
- #start_worker(worker_options = { }) ⇒ Object
- #update_result(worker_key, result) ⇒ Object
Methods included from Core
Instance Attribute Details
#fd_writers ⇒ Object
set_thread_pool_size(20)
5 6 7 |
# File 'lib/packet/packet_master.rb', line 5 def fd_writers @fd_writers end |
#live_workers ⇒ Object
Returns the value of attribute live_workers.
8 9 10 |
# File 'lib/packet/packet_master.rb', line 8 def live_workers @live_workers end |
#msg_reader ⇒ Object
set_thread_pool_size(20)
5 6 7 |
# File 'lib/packet/packet_master.rb', line 5 def msg_reader @msg_reader end |
#msg_writers ⇒ Object
set_thread_pool_size(20)
5 6 7 |
# File 'lib/packet/packet_master.rb', line 5 def msg_writers @msg_writers end |
#result_hash ⇒ Object
Returns the value of attribute result_hash.
6 7 8 |
# File 'lib/packet/packet_master.rb', line 6 def result_hash @result_hash end |
Class Method Details
.run {|master_reactor_instance| ... } ⇒ Object
15 16 17 18 19 20 21 22 |
# File 'lib/packet/packet_master.rb', line 15 def self.run master_reactor_instance = new master_reactor_instance.result_hash = {} master_reactor_instance.live_workers = DoubleKeyedHash.new yield(master_reactor_instance) master_reactor_instance.load_workers master_reactor_instance.start_reactor end |
.server_logger=(log_file_name) ⇒ Object
after_connection :provide_workers
11 12 13 |
# File 'lib/packet/packet_master.rb', line 11 def self.server_logger= (log_file_name) @@server_logger = log_file_name end |
Instance Method Details
#delete_worker(worker_options = {}) ⇒ Object
51 52 53 54 55 56 |
# File 'lib/packet/packet_master.rb', line 51 def delete_worker( = {}) worker_name = [:worker] worker_name_key = gen_worker_key(worker_name,[:worker_key]) [:method] = :exit @live_workers[worker_name_key].send_request() end |
#enable_nonblock(io) ⇒ Object
87 88 89 90 |
# File 'lib/packet/packet_master.rb', line 87 def enable_nonblock io f = io.fcntl(Fcntl::F_GETFL,0) io.fcntl(Fcntl::F_SETFL,Fcntl::O_NONBLOCK | f) end |
#fork_and_load(worker_klass, worker_options = { }) ⇒ Object
method should use worker_key if provided in options hash.
93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 |
# File 'lib/packet/packet_master.rb', line 93 def fork_and_load(worker_klass, = { }) t_worker_name = worker_klass.worker_name worker_pimp = worker_klass.worker_proxy.to_s # socket from which master process is going to read master_read_end,worker_write_end = UNIXSocket.pair(Socket::SOCK_STREAM) # socket to which master process is going to write worker_read_end,master_write_end = UNIXSocket.pair(Socket::SOCK_STREAM) option_dump = Marshal.dump() option_dump_length = option_dump.length master_write_end.write(option_dump) worker_name_key = gen_worker_key(t_worker_name,[:worker_key]) if(!(pid = fork)) [master_write_end,master_read_end].each { |x| x.close } [worker_read_end,worker_write_end].each { |x| enable_nonblock(x) } begin if(ARGV[0] == 'start' && Object.const_defined?(:SERVER_LOGGER)) redirect_io(SERVER_LOGGER) end rescue puts $!.backtrace end exec form_cmd_line(worker_read_end.fileno,worker_write_end.fileno,t_worker_name,option_dump_length) end Process.detach(pid) [master_read_end,master_write_end].each { |x| enable_nonblock(x) } if worker_pimp && !worker_pimp.empty? require worker_pimp pimp_klass = Object.const_get(packet_classify(worker_pimp)) @live_workers[worker_name_key,master_read_end.fileno] = pimp_klass.new(master_write_end,pid,self) else t_pimp = Packet::MetaPimp.new(master_write_end,pid,self) t_pimp.worker_key = worker_name_key t_pimp.worker_name = t_worker_name t_pimp.invokable_worker_methods = worker_klass.instance_methods @live_workers[worker_name_key,master_read_end.fileno] = t_pimp end worker_read_end.close worker_write_end.close read_ios << master_read_end end |
#form_cmd_line(*args) ⇒ Object
160 161 162 163 164 165 |
# File 'lib/packet/packet_master.rb', line 160 def form_cmd_line *args min_string = "packet_worker_runner #{args[0]}:#{args[1]}:#{args[2]}:#{args[3]}" min_string << ":#{WORKER_ROOT}" if defined? WORKER_ROOT min_string << ":#{WORKER_LOAD_ENV}" if defined? WORKER_LOAD_ENV min_string end |
#handle_internal_messages(t_sock) ⇒ Object
33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/packet/packet_master.rb', line 33 def (t_sock) sock_fd = t_sock.fileno worker_instance = @live_workers[sock_fd] begin raw_data = read_data(t_sock) worker_instance.receive_data(raw_data) if worker_instance.respond_to?(:receive_data) rescue DisconnectError => sock_error worker_instance.receive_data(sock_error.data) if worker_instance.respond_to?(:receive_data) remove_worker(t_sock) end end |
#load_workers ⇒ Object
58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/packet/packet_master.rb', line 58 def load_workers worker_root = defined?(WORKER_ROOT) ? WORKER_ROOT : "#{PACKET_APP}/worker" t_workers = Dir["#{worker_root}/**/*.rb"] return if t_workers.empty? t_workers.each do |b_worker| worker_name = File.basename(b_worker,".rb") require worker_name worker_klass = Object.const_get(packet_classify(worker_name)) next if worker_klass.no_auto_load fork_and_load(worker_klass) end end |
#redirect_io(logfile_name) ⇒ Object
Free file descriptors and point them somewhere sensible STDOUT/STDERR should go to a logfile
142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 |
# File 'lib/packet/packet_master.rb', line 142 def redirect_io(logfile_name) begin; STDIN.reopen "/dev/null"; rescue ::Exception; end if logfile_name begin STDOUT.reopen logfile_name, "a" STDOUT.sync = true rescue ::Exception begin; STDOUT.reopen "/dev/null"; rescue ::Exception; end end else begin; STDOUT.reopen "/dev/null"; rescue ::Exception; end end begin; STDERR.reopen STDOUT; rescue ::Exception; end STDERR.sync = true end |
#remove_worker(t_sock) ⇒ Object
46 47 48 49 |
# File 'lib/packet/packet_master.rb', line 46 def remove_worker(t_sock) @live_workers.delete(t_sock.fileno) read_ios.delete(t_sock) end |
#set_result_hash(hash) ⇒ Object
end of run method
24 25 26 |
# File 'lib/packet/packet_master.rb', line 24 def set_result_hash(hash) @result_hash = hash end |
#start_worker(worker_options = { }) ⇒ Object
72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/packet/packet_master.rb', line 72 def start_worker( = { }) worker_name = [:worker].to_s worker_name_key = gen_worker_key(worker_name,[:worker_key]) return if @live_workers[worker_name_key] .delete(:worker) begin require worker_name worker_klass = Object.const_get(packet_classify(worker_name)) fork_and_load(worker_klass,) rescue LoadError puts "no such worker #{worker_name}" return end end |
#update_result(worker_key, result) ⇒ Object
28 29 30 31 |
# File 'lib/packet/packet_master.rb', line 28 def update_result(worker_key,result) @result_hash ||= {} @result_hash[worker_key.to_sym] = result end |