Class: Gearman::Client
- Inherits:
-
Object
- Object
- Gearman::Client
- Defined in:
- lib/gearman/client.rb
Overview
Client
Description
A client for communicating with Gearman job servers.
Instance Attribute Summary collapse
-
#bad_servers ⇒ Object
readonly
Returns the value of attribute bad_servers.
-
#job_servers ⇒ Object
Returns the value of attribute job_servers.
-
#task_create_timeout_sec ⇒ Object
Returns the value of attribute task_create_timeout_sec.
-
#test_hostport ⇒ Object
Returns the value of attribute test_hostport.
Instance Method Summary collapse
- #close_socket(sock) ⇒ Object
-
#do_task(*args) ⇒ Object
Perform a single task.
-
#get_hostport_for_socket(sock) ⇒ Object
Given a socket from Client#get_socket, return its host and port.
-
#get_job_server ⇒ "host:port"
Get connection info about an arbitrary (currently random, but maybe we’ll do something smarter later) job server.
-
#get_socket(hostport, num_retries = 3) ⇒ Object
Get a socket for a job server.
-
#initialize(job_servers = nil) ⇒ Client
constructor
Create a new client.
-
#option_request(opts) ⇒ Object
Set the options.
-
#return_socket(sock) ⇒ Object
Relinquish a socket created by Client#get_socket.
- #signal_bad_server(hostport) ⇒ Object
Constructor Details
#initialize(job_servers = nil) ⇒ Client
Create a new client.
16 17 18 19 20 21 22 23 24 25 |
# File 'lib/gearman/client.rb', line 16 def initialize(job_servers=nil) @job_servers = [] # "host:port" self.job_servers = job_servers if job_servers @sockets = {} # "host:port" -> [sock1, sock2, ...] @socket_to_hostport = {} # sock -> "host:port" @test_hostport = nil # make get_job_server return a given host for testing @task_create_timeout_sec = 10 @server_counter = -1 @bad_servers = [] end |
Instance Attribute Details
#bad_servers ⇒ Object (readonly)
Returns the value of attribute bad_servers.
26 27 28 |
# File 'lib/gearman/client.rb', line 26 def bad_servers @bad_servers end |
#job_servers ⇒ Object
Returns the value of attribute job_servers.
26 27 28 |
# File 'lib/gearman/client.rb', line 26 def job_servers @job_servers end |
#task_create_timeout_sec ⇒ Object
Returns the value of attribute task_create_timeout_sec.
27 28 29 |
# File 'lib/gearman/client.rb', line 27 def task_create_timeout_sec @task_create_timeout_sec end |
#test_hostport ⇒ Object
Returns the value of attribute test_hostport.
27 28 29 |
# File 'lib/gearman/client.rb', line 27 def test_hostport @test_hostport end |
Instance Method Details
#close_socket(sock) ⇒ Object
115 116 117 118 119 |
# File 'lib/gearman/client.rb', line 115 def close_socket(sock) sock.close @socket_to_hostport.delete(sock) nil end |
#do_task(*args) ⇒ Object
Perform a single task.
135 136 137 138 139 140 141 142 143 144 145 146 147 148 |
# File 'lib/gearman/client.rb', line 135 def do_task(*args) task = Util::get_task_from_args(*args) result = nil failed = false task.on_complete {|v| result = v } task.on_fail { failed = true } taskset = TaskSet.new(self) taskset.add_task(task) taskset.wait failed ? nil : result end |
#get_hostport_for_socket(sock) ⇒ Object
Given a socket from Client#get_socket, return its host and port.
126 127 128 |
# File 'lib/gearman/client.rb', line 126 def get_hostport_for_socket(sock) @socket_to_hostport[sock] end |
#get_job_server ⇒ "host:port"
Get connection info about an arbitrary (currently random, but maybe we’ll do something smarter later) job server.
56 57 58 59 60 61 62 63 |
# File 'lib/gearman/client.rb', line 56 def get_job_server raise Exception.new('No servers available') if @job_servers.empty? @server_counter += 1 # Return a specific server if one's been set. @test_hostport or @job_servers[@server_counter % @job_servers.size] end |
#get_socket(hostport, num_retries = 3) ⇒ Object
Get a socket for a job server.
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 |
# File 'lib/gearman/client.rb', line 74 def get_socket(hostport, num_retries=3) # If we already have an open socket to this host, return it. if @sockets[hostport] sock = @sockets[hostport].shift @sockets.delete(hostport) if @sockets[hostport].size == 0 return sock end num_retries.times do |i| begin sock = TCPSocket.new(*hostport.split(':')) rescue Exception else @socket_to_hostport[sock] = hostport return sock end end signal_bad_server(hostport) raise RuntimeError, "Unable to connect to job server #{hostport}" end |
#option_request(opts) ⇒ Object
Set the options
33 34 35 36 37 38 39 40 |
# File 'lib/gearman/client.rb', line 33 def option_request(opts) Util.logger.debug "GearmanRuby: Send options request with #{opts}" request = Util.pack_request("option_req", opts) sock= self.get_socket(self.get_job_server) Util.send_request(sock, request) response = Util.read_response(sock, 20) raise ProtocolError, response[1] if response[0]==:error end |
#return_socket(sock) ⇒ Object
Relinquish a socket created by Client#get_socket.
If we don’t know about the socket, we just close it.
103 104 105 106 107 108 109 110 111 112 113 |
# File 'lib/gearman/client.rb', line 103 def return_socket(sock) hostport = get_hostport_for_socket(sock) if not hostport inet, port, host, ip = s.addr Util.logger.error "GearmanRuby: Got socket for #{ip}:#{port}, which we don't " + "know about -- closing" sock.close return end (@sockets[hostport] ||= []) << sock end |
#signal_bad_server(hostport) ⇒ Object
65 66 67 68 |
# File 'lib/gearman/client.rb', line 65 def signal_bad_server(hostport) @job_servers = @job_servers.reject { |s| s == hostport } @bad_servers << hostport end |