Class: Gearman::Util

Inherits:
Object
  • Object
show all
Defined in:
lib/gearman/util.rb

Overview

Util

Description

Static helper methods and data used by other classes.

Constant Summary collapse

COMMANDS =

Map from Integer representations of commands used in the network protocol to more-convenient symbols.

{
  1  => :can_do,               # W->J: FUNC
  2  => :cant_do,              # W->J: FUNC
  3  => :reset_abilities,      # W->J: --
  4  => :pre_sleep,            # W->J: --
  #5 =>  (unused),             # -      -
  6  => :noop,                 # J->W: --
  7  => :submit_job,           # C->J: FUNC[0]UNIQ[0]ARGS
  8  => :job_created,          # J->C: HANDLE
  9  => :grab_job,             # W->J: --
  10 => :no_job,               # J->W: --
  11 => :job_assign,           # J->W: HANDLE[0]FUNC[0]ARG
  12 => :work_status,          # W->J/C: HANDLE[0]NUMERATOR[0]DENOMINATOR
  13 => :work_complete,        # W->J/C: HANDLE[0]RES
  14 => :work_fail,            # W->J/C: HANDLE
  15 => :get_status,           # C->J: HANDLE
  16 => :echo_req,             # ?->J: TEXT
  17 => :echo_res,             # J->?: TEXT
  18 => :submit_job_bg,        # C->J: FUNC[0]UNIQ[0]ARGS
  19 => :error,                # J->?: ERRCODE[0]ERR_TEXT
  20 => :status_res,           # C->J: HANDLE[0]KNOWN[0]RUNNING[0]NUM[0]DENOM
  21 => :submit_job_high,      # C->J: FUNC[0]UNIQ[0]ARGS
  22 => :set_client_id,        # W->J: [RANDOM_STRING_NO_WHITESPACE]
  23 => :can_do_timeout,       # W->J: FUNC[0]TIMEOUT
  24 => :all_yours,            # REQ    Worker
  25 => :work_exception,       # W->J: HANDLE[0]ARG
  26 => :option_req,           # C->J: TEXT
  27 => :option_res,           # J->C: TEXT
  28 => :work_data,            # REQ    Worker
  29 => :work_warning,         # W->J/C: HANDLE[0]MSG
  30 => :grab_job_uniq,        # REQ    Worker
  31 => :job_assign_uniq,      # RES    Worker
  32 => :submit_job_high_bg,   # C->J: FUNC[0]UNIQ[0]ARGS
  33 => :submit_job_low,       # C->J: FUNC[0]UNIQ[0]ARGS
  34 => :submit_job_low_bg,    # C->J: FUNC[0]UNIQ[0]ARGS
  35 => :submit_job_sched,     # REQ    Client
  36 => :submit_job_epoch      # C->J: FUNC[0]UNIQ[0]EPOCH[0]ARGS
}
NUMS =

Map e.g. ‘can_do’ => 1

COMMANDS.invert
DEFAULT_PORT =

Default job server port.

4730

Class Method Summary collapse

Class Method Details

.ability_name_with_prefix(prefix, name) ⇒ Object Also known as: ability_name_for_perl



198
199
200
# File 'lib/gearman/util.rb', line 198

def Util.ability_name_with_prefix(prefix,name)
  "#{prefix}\t#{name}"
end

.get_task_from_args(*args) ⇒ Object

Return a Task based on the passed-in arguments.

Parameters:

  • args

    either a single Task object or the arguments accepted by Task.new

Returns:

  • Task object



95
96
97
98
99
100
101
102
103
# File 'lib/gearman/util.rb', line 95

def Util.get_task_from_args(*args)
  if (args[0].class == Task || args[0].class.superclass == Task)
    return args[0]
  elsif args.size <= 3
    return Task.new(*args)
  else
    raise InvalidArgsError, 'Incorrect number of args to get_task_from_args'
  end
end

.handle_to_str(hostport, handle) ⇒ "host:port//handle"

Convert job server info and a handle into a string.

Parameters:

  • hostportofjobserver ("host:port")

    ostport “host:port” of job server

  • handle

    job server-returned handle for a task

Returns:

  • ("host:port//handle")

    host:port//handle“



176
177
178
# File 'lib/gearman/util.rb', line 176

def Util.handle_to_str(hostport, handle)
  "#{hostport}//#{handle}"
end

.loggerObject



67
68
69
70
71
72
73
74
# File 'lib/gearman/util.rb', line 67

def Util.logger
  @logger ||=
    begin
      l = Logger.new($stdout)
      l.level = Logger::FATAL
      l
    end
end

.logger=(logger) ⇒ Object



63
64
65
# File 'lib/gearman/util.rb', line 63

def Util.logger=(logger)
  @logger = logger
end

.normalize_job_servers(servers) ⇒ Object

Add default ports to a job server or list of servers.

Parameters:

  • servers

    a server hostname or “host:port” or array of servers

Returns:

  • an array of “host:port” strings



163
164
165
166
167
168
# File 'lib/gearman/util.rb', line 163

def Util.normalize_job_servers(servers)
  if servers.class == String or servers.class == Symbol
    servers = [ servers.to_s ]
  end
  servers.map {|s| s =~ /:/ ? s : "#{s}:#{DEFAULT_PORT}" }
end

.pack_request(type_name, arg = '') ⇒ Object

Construct a request packet.

Parameters:

  • type_name

    command type’s name (see COMMANDS)

  • arg (defaults to: '')

    optional data to pack into the command

Raises:



82
83
84
85
86
87
# File 'lib/gearman/util.rb', line 82

def Util.pack_request(type_name, arg='')
  type_num = NUMS[type_name.to_sym]
  raise InvalidArgsError, "Invalid type name '#{type_name}'" unless type_num
  arg = '' if not arg
  "\0REQ" + [type_num, arg.size].pack('NN') + arg
end

.read_response(sock, timeout = nil) ⇒ Object

Read a response packet from a socket.

Parameters:

  • sock

    Socket connected to a job server

  • timeout (defaults to: nil)

    timeout in seconds, nil for no timeout

Returns:

  • array consisting of integer packet type and data

Raises:



133
134
135
136
137
138
139
140
141
142
143
144
# File 'lib/gearman/util.rb', line 133

def Util.read_response(sock, timeout=nil)
  #debugger
  end_time = Time.now.to_f + timeout if timeout
  head = timed_recv(sock, 12, timeout)
  magic, type, len = head.unpack('a4NN')
  raise ProtocolError, "Invalid magic '#{magic}'" unless magic == "\0RES"
  buf = len > 0 ?
    timed_recv(sock, len, timeout ? end_time - Time.now.to_f : nil) : ''
  type = COMMANDS[type]
  raise ProtocolError, "Invalid packet type #{type}" unless type
  [type, buf]
end

.send_request(sock, req) ⇒ Object

Send a request packet over a socket.

Parameters:

  • sock

    Socket connected to a job server

  • req

    request packet to send



151
152
153
154
155
156
# File 'lib/gearman/util.rb', line 151

def Util.send_request(sock, req)
  len = with_safe_socket_op{ sock.write(req) }
  if len != req.size
    raise NetworkError, "Wrote #{len} instead of #{req.size}"
  end
end

.str_to_handle(str) ⇒ hostport, handle

Reverse Util.handle_to_str.

Parameters:

  • str ("host:port//handle")

    tr “host:port//handle”

Returns:

  • (hostport, handle)


185
186
187
188
# File 'lib/gearman/util.rb', line 185

def Util.str_to_handle(str)
  str =~ %r{^([^:]+:\d+)//(.+)}
  return [$1, $3]
end

.timed_recv(sock, len, timeout = nil) ⇒ Object

Read from a socket, giving up if it doesn’t finish quickly enough. NetworkError is thrown if we don’t read all the bytes in time.

Parameters:

  • sock

    Socket from which we read

  • len

    number of bytes to read

  • timeout (defaults to: nil)

    maximum number of seconds we’ll take; nil for no timeout

Returns:

  • full data that was read



113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/gearman/util.rb', line 113

def Util.timed_recv(sock, len, timeout=nil)
  data = ''
  end_time = Time.now.to_f + timeout if timeout
  while data.size < len and (not timeout or Time.now.to_f < end_time) do
    IO::select([sock], nil, nil, timeout ? end_time - Time.now.to_f : nil) \
      or break
    data += sock.readpartial(len - data.size)
  end
  if data.size < len
    raise NetworkError, "Read #{data.size} byte(s) instead of #{len}"
  end
  data
end

.with_safe_socket_opObject



190
191
192
193
194
195
196
# File 'lib/gearman/util.rb', line 190

def self.with_safe_socket_op
  begin
    yield
  rescue Exception => ex
    raise ServerDownException.new(ex.message)
  end
end