Class: Gearman::Util
- Inherits:
-
Object
- Object
- Gearman::Util
- Defined in:
- lib/gearman/util.rb
Overview
Util
Description
Static helper methods and data used by other classes.
Constant Summary collapse
- COMMANDS =
Map from Integer representations of commands used in the network protocol to more-convenient symbols.
{ 1 => :can_do, # W->J: FUNC 2 => :cant_do, # W->J: FUNC 3 => :reset_abilities, # W->J: -- 4 => :pre_sleep, # W->J: -- #5 => (unused), # - - 6 => :noop, # J->W: -- 7 => :submit_job, # C->J: FUNC[0]UNIQ[0]ARGS 8 => :job_created, # J->C: HANDLE 9 => :grab_job, # W->J: -- 10 => :no_job, # J->W: -- 11 => :job_assign, # J->W: HANDLE[0]FUNC[0]ARG 12 => :work_status, # W->J/C: HANDLE[0]NUMERATOR[0]DENOMINATOR 13 => :work_complete, # W->J/C: HANDLE[0]RES 14 => :work_fail, # W->J/C: HANDLE 15 => :get_status, # C->J: HANDLE 16 => :echo_req, # ?->J: TEXT 17 => :echo_res, # J->?: TEXT 18 => :submit_job_bg, # C->J: FUNC[0]UNIQ[0]ARGS 19 => :error, # J->?: ERRCODE[0]ERR_TEXT 20 => :status_res, # C->J: HANDLE[0]KNOWN[0]RUNNING[0]NUM[0]DENOM 21 => :submit_job_high, # C->J: FUNC[0]UNIQ[0]ARGS 22 => :set_client_id, # W->J: [RANDOM_STRING_NO_WHITESPACE] 23 => :can_do_timeout, # W->J: FUNC[0]TIMEOUT 24 => :all_yours, # REQ Worker 25 => :work_exception, # W->J: HANDLE[0]ARG 26 => :option_req, # C->J: TEXT 27 => :option_res, # J->C: TEXT 28 => :work_data, # REQ Worker 29 => :work_warning, # W->J/C: HANDLE[0]MSG 30 => :grab_job_uniq, # REQ Worker 31 => :job_assign_uniq, # RES Worker 32 => :submit_job_high_bg, # C->J: FUNC[0]UNIQ[0]ARGS 33 => :submit_job_low, # C->J: FUNC[0]UNIQ[0]ARGS 34 => :submit_job_low_bg, # C->J: FUNC[0]UNIQ[0]ARGS 35 => :submit_job_sched, # REQ Client 36 => :submit_job_epoch # C->J: FUNC[0]UNIQ[0]EPOCH[0]ARGS }
- NUMS =
Map e.g. ‘can_do’ => 1
COMMANDS.invert
- DEFAULT_PORT =
Default job server port.
4730
Class Method Summary collapse
- .ability_name_with_prefix(prefix, name) ⇒ Object (also: ability_name_for_perl)
-
.get_task_from_args(*args) ⇒ Object
Return a Task based on the passed-in arguments.
-
.handle_to_str(hostport, handle) ⇒ "host:port//handle"
Convert job server info and a handle into a string.
- .logger ⇒ Object
- .logger=(logger) ⇒ Object
-
.normalize_job_servers(servers) ⇒ Object
Add default ports to a job server or list of servers.
-
.pack_request(type_name, arg = '') ⇒ Object
Construct a request packet.
-
.read_response(sock, timeout = nil) ⇒ Object
Read a response packet from a socket.
-
.send_request(sock, req) ⇒ Object
Send a request packet over a socket.
-
.str_to_handle(str) ⇒ hostport, handle
Reverse Util.handle_to_str.
-
.timed_recv(sock, len, timeout = nil) ⇒ Object
Read from a socket, giving up if it doesn’t finish quickly enough.
- .with_safe_socket_op ⇒ Object
Class Method Details
.ability_name_with_prefix(prefix, name) ⇒ Object Also known as: ability_name_for_perl
198 199 200 |
# File 'lib/gearman/util.rb', line 198 def Util.ability_name_with_prefix(prefix,name) "#{prefix}\t#{name}" end |
.get_task_from_args(*args) ⇒ Object
Return a Task based on the passed-in arguments.
95 96 97 98 99 100 101 102 103 |
# File 'lib/gearman/util.rb', line 95 def Util.get_task_from_args(*args) if (args[0].class == Task || args[0].class.superclass == Task) return args[0] elsif args.size <= 3 return Task.new(*args) else raise InvalidArgsError, 'Incorrect number of args to get_task_from_args' end end |
.handle_to_str(hostport, handle) ⇒ "host:port//handle"
Convert job server info and a handle into a string.
176 177 178 |
# File 'lib/gearman/util.rb', line 176 def Util.handle_to_str(hostport, handle) "#{hostport}//#{handle}" end |
.logger ⇒ Object
67 68 69 70 71 72 73 74 |
# File 'lib/gearman/util.rb', line 67 def Util.logger @logger ||= begin l = Logger.new($stdout) l.level = Logger::FATAL l end end |
.logger=(logger) ⇒ Object
63 64 65 |
# File 'lib/gearman/util.rb', line 63 def Util.logger=(logger) @logger = logger end |
.normalize_job_servers(servers) ⇒ Object
Add default ports to a job server or list of servers.
163 164 165 166 167 168 |
# File 'lib/gearman/util.rb', line 163 def Util.normalize_job_servers(servers) if servers.class == String or servers.class == Symbol servers = [ servers.to_s ] end servers.map {|s| s =~ /:/ ? s : "#{s}:#{DEFAULT_PORT}" } end |
.pack_request(type_name, arg = '') ⇒ Object
Construct a request packet.
82 83 84 85 86 87 |
# File 'lib/gearman/util.rb', line 82 def Util.pack_request(type_name, arg='') type_num = NUMS[type_name.to_sym] raise InvalidArgsError, "Invalid type name '#{type_name}'" unless type_num arg = '' if not arg "\0REQ" + [type_num, arg.size].pack('NN') + arg end |
.read_response(sock, timeout = nil) ⇒ Object
Read a response packet from a socket.
133 134 135 136 137 138 139 140 141 142 143 144 |
# File 'lib/gearman/util.rb', line 133 def Util.read_response(sock, timeout=nil) #debugger end_time = Time.now.to_f + timeout if timeout head = timed_recv(sock, 12, timeout) magic, type, len = head.unpack('a4NN') raise ProtocolError, "Invalid magic '#{magic}'" unless magic == "\0RES" buf = len > 0 ? timed_recv(sock, len, timeout ? end_time - Time.now.to_f : nil) : '' type = COMMANDS[type] raise ProtocolError, "Invalid packet type #{type}" unless type [type, buf] end |
.send_request(sock, req) ⇒ Object
Send a request packet over a socket.
151 152 153 154 155 156 |
# File 'lib/gearman/util.rb', line 151 def Util.send_request(sock, req) len = with_safe_socket_op{ sock.write(req) } if len != req.size raise NetworkError, "Wrote #{len} instead of #{req.size}" end end |
.str_to_handle(str) ⇒ hostport, handle
Reverse Util.handle_to_str.
185 186 187 188 |
# File 'lib/gearman/util.rb', line 185 def Util.str_to_handle(str) str =~ %r{^([^:]+:\d+)//(.+)} return [$1, $3] end |
.timed_recv(sock, len, timeout = nil) ⇒ Object
Read from a socket, giving up if it doesn’t finish quickly enough. NetworkError is thrown if we don’t read all the bytes in time.
113 114 115 116 117 118 119 120 121 122 123 124 125 |
# File 'lib/gearman/util.rb', line 113 def Util.timed_recv(sock, len, timeout=nil) data = '' end_time = Time.now.to_f + timeout if timeout while data.size < len and (not timeout or Time.now.to_f < end_time) do IO::select([sock], nil, nil, timeout ? end_time - Time.now.to_f : nil) \ or break data += sock.readpartial(len - data.size) end if data.size < len raise NetworkError, "Read #{data.size} byte(s) instead of #{len}" end data end |
.with_safe_socket_op ⇒ Object
190 191 192 193 194 195 196 |
# File 'lib/gearman/util.rb', line 190 def self.with_safe_socket_op begin yield rescue Exception => ex raise ServerDownException.new(ex.) end end |