Module: Gearman::Evented::ClientReactor

Includes:
Reactor
Defined in:
lib/gearman/evented/client.rb

Instance Method Summary collapse

Methods included from Reactor

connect, #connected?, #disconnect, included, #log, #reconnect, #send, #server, #to_s, #unbind

Instance Method Details

#connection_completedObject



15
16
17
18
19
20
21
# File 'lib/gearman/evented/client.rb', line 15

def connection_completed
  @cbs_job_created ||= []
  @pending_jobs    = []
  @assigned_jobs   = {}
  @background_jobs = {}
  super
end

#dispatch(type, handle, args) ⇒ Object

Raises:



74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/gearman/evented/client.rb', line 74

def dispatch(type, handle, args)
  return unless type
  task = @assigned_jobs[handle]
  task = @background_jobs[handle] unless task
  raise ProtocolError, "No task by that name: #{handle}" unless task

  if :work_fail == type && task.should_retry?
    task.dispatch(:on_retry, task.retries_done)
    @assigned_jobs.delete(handle)
    submit_job(task)
    return
  end

  if type == :status_res
    task.dispatch(:on_status, args)
  else
    task.dispatch(type.to_s.sub("work", "on"), *args)
  end

  @assigned_jobs.delete(handle) if [:work_complete, :work_fail].include?(type)
  disconnect if @assigned_jobs.empty? && !keep_connected
end

#dispatch_packet(type, handle, *data) ⇒ Object



36
37
38
39
40
41
42
43
44
45
46
# File 'lib/gearman/evented/client.rb', line 36

def dispatch_packet(type, handle, *data)
  log "Got #{type.to_s}, #{handle}, #{data.inspect} from #{server}"
  if type == :job_created
    job_created(handle)
    if cb = @cbs_job_created.shift
      cb.call(handle)
    end
  else
    dispatch(type, handle, data)
  end
end

#dispatch_packet_callback(&callback) ⇒ Object



32
33
34
# File 'lib/gearman/evented/client.rb', line 32

def dispatch_packet_callback(&callback)
  @dispatch_packet_callback = callback
end

#get_status(handle) ⇒ Object



70
71
72
# File 'lib/gearman/evented/client.rb', line 70

def get_status(handle)
  send :get_status, handle
end

#job_created(handle) ⇒ Object

Raises:



59
60
61
62
63
64
65
66
67
68
# File 'lib/gearman/evented/client.rb', line 59

def job_created(handle)
  job = @pending_jobs.shift
  raise ProtocolError, "No job waiting for handle! (#{handle})" unless job
  EM.add_periodic_timer(job.poll_status_interval) { get_status(handle) } if job.poll_status_interval
  if job.background
    @background_jobs[handle] = job
  else
    @assigned_jobs[handle] = job
  end
end

#keep_connectedObject



7
8
9
# File 'lib/gearman/evented/client.rb', line 7

def keep_connected
  @keep_connected ||= (@opts[:keep_connected] || false)
end

#keep_connected=(keep) ⇒ Object



11
12
13
# File 'lib/gearman/evented/client.rb', line 11

def keep_connected=(keep)
  @keep_connected = keep
end

#receive_data(data) ⇒ Object



23
24
25
26
27
28
29
30
# File 'lib/gearman/evented/client.rb', line 23

def receive_data(data)
  packets = Gearman::Protocol.decode_response(data)
  log "received #{packets.size} packet(s) at once"
  log "packets: #{packets.inspect}"
  packets.each do |type, handle, *data|
    dispatch_packet(type, handle, *data)
  end
end

#submit_job(task, &cb_job_created) ⇒ Object



48
49
50
51
52
53
54
55
56
57
# File 'lib/gearman/evented/client.rb', line 48

def submit_job(task, &cb_job_created)
  cmd = "submit_job"
  cmd << "_#{task.priority}" if [ :high, :low ].include?(task.priority)
  cmd << "_bg" if task.background

  log "#{cmd} #{task.name}, #{task.payload} to #{server}"
  send cmd.to_sym, [ task.name, task.hash, task.payload ].join("\0")
  @pending_jobs << task
  @cbs_job_created << cb_job_created if cb_job_created
end