Module: Gearman::Evented::WorkerReactor
- Includes:
- Reactor
- Defined in:
- lib/gearman/evented/worker.rb
Instance Method Summary
collapse
Methods included from Reactor
connect, #connected?, #disconnect, included, #log, #reconnect, #send, #server, #to_s, #unbind
Instance Method Details
#announce_ability(name, timeout) ⇒ Object
19
20
21
22
23
24
|
# File 'lib/gearman/evented/worker.rb', line 19
def announce_ability(name, timeout)
cmd = timeout ? :can_do_timeout : :can_do
arg = timeout ? "#{name}\0#{timeout.to_s}" : name
log "announce_ability #{name} #{timeout}"
send cmd, arg
end
|
#announce_disability(name) ⇒ Object
26
27
28
|
# File 'lib/gearman/evented/worker.rb', line 26
def announce_disability(name)
send :cant_do, name
end
|
#client_id ⇒ Object
111
112
113
|
# File 'lib/gearman/evented/worker.rb', line 111
def client_id
@client_id ||= `uuidgen`.strip
end
|
#connection_completed ⇒ Object
7
8
9
10
11
12
13
14
15
16
17
|
# File 'lib/gearman/evented/worker.rb', line 7
def connection_completed
send :set_client_id, client_id
super
@abilities ||= @opts.delete(:abilities) || []
@abilities.each do |ability, args|
announce_ability(ability, args[:timeout])
end
grab_job
end
|
#dispatch_packet(type, handle, *data) ⇒ Object
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
|
# File 'lib/gearman/evented/worker.rb', line 53
def dispatch_packet(type, handle, *data)
success = true
timer = 0
case type
when :no_job
send :pre_sleep
timer = @opts[:reconnect_sec] || 30
when :job_assign, :job_assign_uniq
log "job assign #{handle}, #{data.inspect}"
handle_job_assign(handle, data[0], data[1])
when :noop
log "NOOP"
when :error
log "[ERROR]: error from server #{server}: #{data}"
else
log "Got unknown #{type}, #{data} from #{server}"
end
EM.add_timer(timer) { grab_job }
succeed [handle, data]
end
|
#grab_job ⇒ Object
30
31
32
33
|
# File 'lib/gearman/evented/worker.rb', line 30
def grab_job
log "Grab Job"
send :grab_job_uniq
end
|
#handle_job_assign(handle, func, args = '') ⇒ Object
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
|
# File 'lib/gearman/evented/worker.rb', line 75
def handle_job_assign(handle, func, args = '')
return unless handle
unless func
log "ERROR: Ignoring job_assign with no function"
return
end
log "Got job_assign '#{func}' with handle #{handle} and #{args.size rescue 0} byte(s)"
unless @abilities.has_key?(func)
log "Ignoring job_assign for unsupported func #{func} with handle #{handle}"
work_fail handle
return
end
exception = nil
begin
ret = @abilities[func][:callback].call(args, Gearman::Job.new(self, handle))
rescue Exception => e
exception = e
end
if ret && exception.nil?
ret = ret.to_s
log "Sending work_complete for #{handle} with #{ret.size} byte(s)"
work_complete handle, ret
elsif exception.nil?
log "Sending work_fail for #{handle} to #{server}"
work_fail handle
elsif exception
log "exception #{exception.message}, sending work_warning, work_fail for #{handle}"
work_warning handle, exception.message
work_fail handle
end
end
|
#receive_data(data) ⇒ Object
47
48
49
50
51
|
# File 'lib/gearman/evented/worker.rb', line 47
def receive_data(data)
Gearman::Protocol.decode_response(data).each do |type, handle, *data|
dispatch_packet(type, handle, *data)
end
end
|
#work_complete(handle, data) ⇒ Object
39
40
41
|
# File 'lib/gearman/evented/worker.rb', line 39
def work_complete(handle, data)
send :work_complete, "#{handle}\0#{data}"
end
|
#work_fail(handle) ⇒ Object
35
36
37
|
# File 'lib/gearman/evented/worker.rb', line 35
def work_fail(handle)
send :work_fail, handle
end
|
#work_warning(handle, message) ⇒ Object
43
44
45
|
# File 'lib/gearman/evented/worker.rb', line 43
def work_warning(handle, message)
send :work_warning, "#{handle}\0#{message}"
end
|