Class: Gearman::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/gearman/client.rb

Overview

Client

Description

A client for communicating with Gearman job servers.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(job_servers = nil) ⇒ Client

Create a new client.

Parameters:

  • job_servers;eitherasingleserveroranarray ("host:port")

    ob_servers “host:port”; either a single server or an array



16
17
18
19
20
21
22
23
24
25
# File 'lib/gearman/client.rb', line 16

def initialize(job_servers=nil)
  @job_servers = []  # "host:port"
  self.job_servers = job_servers if job_servers
  @sockets = {}  # "host:port" -> [sock1, sock2, ...]
  @socket_to_hostport = {}  # sock -> "host:port"
  @test_hostport = nil  # make get_job_server return a given host for testing
  @task_create_timeout_sec = 10
  @server_counter = -1
  @bad_servers = []
end

Instance Attribute Details

#bad_serversObject (readonly)

Returns the value of attribute bad_servers.



26
27
28
# File 'lib/gearman/client.rb', line 26

def bad_servers
  @bad_servers
end

#job_serversObject

Returns the value of attribute job_servers.



26
27
28
# File 'lib/gearman/client.rb', line 26

def job_servers
  @job_servers
end

#task_create_timeout_secObject

Returns the value of attribute task_create_timeout_sec.



27
28
29
# File 'lib/gearman/client.rb', line 27

def task_create_timeout_sec
  @task_create_timeout_sec
end

#test_hostportObject

Returns the value of attribute test_hostport.



27
28
29
# File 'lib/gearman/client.rb', line 27

def test_hostport
  @test_hostport
end

Instance Method Details

#close_socket(sock) ⇒ Object



115
116
117
118
119
# File 'lib/gearman/client.rb', line 115

def close_socket(sock)
  sock.close
  @socket_to_hostport.delete(sock)
  nil
end

#do_task(*args) ⇒ Object

Perform a single task.

Parameters:

  • args

    either a Task or arguments for Task.new

Returns:

  • output of the task, or nil on failure



135
136
137
138
139
140
141
142
143
144
145
146
147
148
# File 'lib/gearman/client.rb', line 135

def do_task(*args)
  task = Util::get_task_from_args(*args)

  result = nil
  failed = false
  task.on_complete {|v| result = v }
  task.on_fail { failed = true }

  taskset = TaskSet.new(self)
  taskset.add_task(task)
  taskset.wait

  failed ? nil : result
end

#get_hostport_for_socket(sock) ⇒ Object

Given a socket from Client#get_socket, return its host and port.

Parameters:

  • sock

    Socket



126
127
128
# File 'lib/gearman/client.rb', line 126

def get_hostport_for_socket(sock)
  @socket_to_hostport[sock]
end

#get_job_server"host:port"

Get connection info about an arbitrary (currently random, but maybe we’ll do something smarter later) job server.

Returns:

  • ("host:port")

    host:port“

Raises:

  • (Exception)


56
57
58
59
60
61
62
63
# File 'lib/gearman/client.rb', line 56

def get_job_server

  raise Exception.new('No servers available') if @job_servers.empty?

  @server_counter += 1
  # Return a specific server if one's been set.
  @test_hostport or @job_servers[@server_counter % @job_servers.size]
end

#get_socket(hostport, num_retries = 3) ⇒ Object

Get a socket for a job server.

Parameters:

  • hostport

    job server “host:port”

Returns:

  • a Socket

Raises:

  • (RuntimeError)


74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/gearman/client.rb', line 74

def get_socket(hostport, num_retries=3)
  # If we already have an open socket to this host, return it.
  if @sockets[hostport]
    sock = @sockets[hostport].shift
    @sockets.delete(hostport) if @sockets[hostport].size == 0
    return sock
  end

  num_retries.times do |i|
    begin
      sock = TCPSocket.new(*hostport.split(':'))
    rescue Exception
    else

      @socket_to_hostport[sock] = hostport
      return sock
    end
  end

  signal_bad_server(hostport)
  raise RuntimeError, "Unable to connect to job server #{hostport}"
end

#option_request(opts) ⇒ Object

Set the options

Raises:



33
34
35
36
37
38
39
40
# File 'lib/gearman/client.rb', line 33

def option_request(opts)
  Util.logger.debug "GearmanRuby: Send options request with #{opts}"
  request = Util.pack_request("option_req", opts)
  sock= self.get_socket(self.get_job_server)
  Util.send_request(sock, request)
  response = Util.read_response(sock, 20)
  raise ProtocolError, response[1] if response[0]==:error
end

#return_socket(sock) ⇒ Object

Relinquish a socket created by Client#get_socket.

If we don’t know about the socket, we just close it.

Parameters:

  • sock

    Socket



103
104
105
106
107
108
109
110
111
112
113
# File 'lib/gearman/client.rb', line 103

def return_socket(sock)
  hostport = get_hostport_for_socket(sock)
  if not hostport
    inet, port, host, ip = s.addr
    Util.logger.error "GearmanRuby: Got socket for #{ip}:#{port}, which we don't " +
      "know about -- closing"
    sock.close
    return
  end
  (@sockets[hostport] ||= []) << sock
end

#signal_bad_server(hostport) ⇒ Object



65
66
67
68
# File 'lib/gearman/client.rb', line 65

def signal_bad_server(hostport)
  @job_servers = @job_servers.reject { |s| s == hostport }
  @bad_servers << hostport
end