Module: Gearman::Evented::ClientReactor
- Includes:
- Reactor
- Defined in:
- lib/gearman/evented/client.rb
Instance Method Summary
collapse
Methods included from Reactor
connect, #connected?, #disconnect, included, #log, #reconnect, #send, #server, #to_s, #unbind
Instance Method Details
#connection_completed ⇒ Object
15
16
17
18
19
20
21
|
# File 'lib/gearman/evented/client.rb', line 15
def connection_completed
@cbs_job_created ||= []
@pending_jobs = []
@assigned_jobs = {}
@background_jobs = {}
super
end
|
#dispatch(type, handle, args) ⇒ Object
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
|
# File 'lib/gearman/evented/client.rb', line 74
def dispatch(type, handle, args)
return unless type
task = @assigned_jobs[handle]
task = @background_jobs[handle] unless task
raise ProtocolError, "No task by that name: #{handle}" unless task
if :work_fail == type && task.should_retry?
task.dispatch(:on_retry, task.retries_done)
@assigned_jobs.delete(handle)
submit_job(task)
return
end
if type == :status_res
task.dispatch(:on_status, args)
else
task.dispatch(type.to_s.sub("work", "on"), *args)
end
@assigned_jobs.delete(handle) if [:work_complete, :work_fail].include?(type)
disconnect if @assigned_jobs.empty? && !keep_connected
end
|
#dispatch_packet(type, handle, *data) ⇒ Object
36
37
38
39
40
41
42
43
44
45
46
|
# File 'lib/gearman/evented/client.rb', line 36
def dispatch_packet(type, handle, *data)
log "Got #{type.to_s}, #{handle}, #{data.inspect} from #{server}"
if type == :job_created
job_created(handle)
if cb = @cbs_job_created.shift
cb.call(handle)
end
else
dispatch(type, handle, data)
end
end
|
#dispatch_packet_callback(&callback) ⇒ Object
32
33
34
|
# File 'lib/gearman/evented/client.rb', line 32
def dispatch_packet_callback(&callback)
@dispatch_packet_callback = callback
end
|
#get_status(handle) ⇒ Object
70
71
72
|
# File 'lib/gearman/evented/client.rb', line 70
def get_status(handle)
send :get_status, handle
end
|
#job_created(handle) ⇒ Object
59
60
61
62
63
64
65
66
67
68
|
# File 'lib/gearman/evented/client.rb', line 59
def job_created(handle)
job = @pending_jobs.shift
raise ProtocolError, "No job waiting for handle! (#{handle})" unless job
EM.add_periodic_timer(job.poll_status_interval) { get_status(handle) } if job.poll_status_interval
if job.background
@background_jobs[handle] = job
else
@assigned_jobs[handle] = job
end
end
|
#keep_connected ⇒ Object
7
8
9
|
# File 'lib/gearman/evented/client.rb', line 7
def keep_connected
@keep_connected ||= (@opts[:keep_connected] || false)
end
|
#keep_connected=(keep) ⇒ Object
11
12
13
|
# File 'lib/gearman/evented/client.rb', line 11
def keep_connected=(keep)
@keep_connected = keep
end
|
#receive_data(data) ⇒ Object
23
24
25
26
27
28
29
30
|
# File 'lib/gearman/evented/client.rb', line 23
def receive_data(data)
packets = Gearman::Protocol.decode_response(data)
log "received #{packets.size} packet(s) at once"
log "packets: #{packets.inspect}"
packets.each do |type, handle, *data|
dispatch_packet(type, handle, *data)
end
end
|
#submit_job(task, &cb_job_created) ⇒ Object
48
49
50
51
52
53
54
55
56
57
|
# File 'lib/gearman/evented/client.rb', line 48
def submit_job(task, &cb_job_created)
cmd = "submit_job"
cmd << "_#{task.priority}" if [ :high, :low ].include?(task.priority)
cmd << "_bg" if task.background
log "#{cmd} #{task.name}, #{task.payload} to #{server}"
send cmd.to_sym, [ task.name, task.hash, task.payload ].join("\0")
@pending_jobs << task
@cbs_job_created << cb_job_created if cb_job_created
end
|