Class: BackgroundQueue::ServerLib::EventConnection

Inherits:
EventMachine::Connection
  • Object
show all
Defined in:
lib/background_queue/server_lib/event_connection.rb

Constant Summary collapse

STAGE_LENGTH =
0
STAGE_BODY =
1
MAX_BODY_LENGTH =
9999999

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#serverObject

Returns the value of attribute server.



6
7
8
# File 'lib/background_queue/server_lib/event_connection.rb', line 6

def server
  @server
end

Instance Method Details

#build_simple_command(type, message) ⇒ Object



65
66
67
# File 'lib/background_queue/server_lib/event_connection.rb', line 65

def build_simple_command(type, message)
  BackgroundQueue::Command.new(type, {}, {:message=>message})
end

#post_initObject



14
15
16
17
18
# File 'lib/background_queue/server_lib/event_connection.rb', line 14

def post_init
  @data = ""
  @length = 0
  @stage = STAGE_LENGTH
end

#process_add_task_command(command) ⇒ Object



91
92
93
94
95
96
97
# File 'lib/background_queue/server_lib/event_connection.rb', line 91

def process_add_task_command(command)
  @server.logger.debug("add_task: #{command.args[:owner_id]}, #{command.args[:job_id]}, #{command.args[:task_id]}")
  task = BackgroundQueue::ServerLib::Task.new(command.args[:owner_id], command.args[:job_id], command.args[:task_id], command.args[:priority], command.args[:worker], command.args[:params], command.options)
  server.task_queue.add_task(task)
  @server.change_stat(:tasks, 1)
  build_simple_command(:result, "ok")
end

#process_add_tasks_command(command) ⇒ Object



99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/background_queue/server_lib/event_connection.rb', line 99

def process_add_tasks_command(command)
  @server.logger.debug("add_tasks: #{command.args[:owner_id]}, #{command.args[:job_id]}, #{command.args[:tasks].inspect}")
  shared_params = command.args[:shared_parameters]
  shared_params = {} if shared_params.nil?
  owner_id = command.args[:owner_id]
  job_id = command.args[:job_id]
  priority = command.args[:priority]
  worker = command.args[:worker]
  for task_data in command.args[:tasks]
    if task_data[1].nil?
      merged_params = shared_params
      merged_options = command.options
    else
      merged_params = shared_params.clone.update(task_data[1])
      if task_data[2].nil?
        merged_options = command.options
      else
        merged_options = command.options.merge(task_data[2])
      end
    end
    task = BackgroundQueue::ServerLib::Task.new(owner_id, job_id, task_data[0], priority, worker, merged_params, merged_options)
    server.task_queue.add_task(task)
  end
  @server.change_stat(:tasks, command.args[:tasks].length)
  build_simple_command(:result, "ok")
end

#process_command(command) ⇒ Object



74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/background_queue/server_lib/event_connection.rb', line 74

def process_command(command)
  case command.code.to_s
  when 'add_task' 
    process_add_task_command(command)
  when 'add_tasks'
    process_add_tasks_command(command)
  when 'remove_tasks'
    process_remove_tasks_command(command)
  when 'get_status'
    process_get_status_command(command)
  when 'stats'
    process_stats_command(command)
  else
    raise "Unknown command: #{command.code.inspect}"
  end
end

#process_data(data) ⇒ Object



45
46
47
48
49
50
51
52
53
54
55
# File 'lib/background_queue/server_lib/event_connection.rb', line 45

def process_data(data)
  begin
    cmd = BackgroundQueue::Command.from_buf(data)
    result = process_command(cmd)
    send_result(result)
  rescue Exception=>e
    @server.logger.error("Error processing command: #{e.message}")
    @server.logger.debug(e.backtrace.join("\n"))
    send_error(e.message)
  end
end

#process_get_status_command(command) ⇒ Object



126
127
128
129
130
131
132
133
# File 'lib/background_queue/server_lib/event_connection.rb', line 126

def process_get_status_command(command)
  job = @server.jobs.get_job(command.args[:job_id])
  if job.nil?
    build_simple_command(:job_not_found, "job #{command.args[:job_id]} not found")
  else
    BackgroundQueue::Command.new(:status, {}, job.get_current_progress)
  end
end

#process_stats_command(command) ⇒ Object



135
136
137
# File 'lib/background_queue/server_lib/event_connection.rb', line 135

def process_stats_command(command)
  BackgroundQueue::Command.new(:stats, {}, @server.get_stats)
end

#receive_data(data) ⇒ Object



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/background_queue/server_lib/event_connection.rb', line 20

def receive_data(data)
  @data << data
  if @stage == STAGE_LENGTH
    if @data.length >= 6
      s_header = @data.slice!(0,6)
      version, length = s_header.unpack("SL")
      
      if version == 1
        @length = length
        @stage = STAGE_BODY
        if length > MAX_BODY_LENGTH || length <= 0
          raise "Invalid length: #{length}"
        end
      else
        raise "Invalid header version: #{version}"
      end
    end
  end
  
  if @stage == STAGE_BODY && @data.length == @length
    #body received
    process_data(@data)
  end
end

#send(data) ⇒ Object



69
70
71
72
# File 'lib/background_queue/server_lib/event_connection.rb', line 69

def send(data)
  data_with_header = [1, data.length, data].pack("SLZ#{data.length}")
  send_data(data_with_header)
end

#send_error(message) ⇒ Object



61
62
63
# File 'lib/background_queue/server_lib/event_connection.rb', line 61

def send_error(message)
  send_result(build_simple_command(:error, message))
end

#send_result(command) ⇒ Object



57
58
59
# File 'lib/background_queue/server_lib/event_connection.rb', line 57

def send_result(command) 
  send(command.to_buf)
end