Class: BackgroundQueue::ServerLib::EventConnection
- Inherits:
-
EventMachine::Connection
- Object
- EventMachine::Connection
- BackgroundQueue::ServerLib::EventConnection
- Defined in:
- lib/background_queue/server_lib/event_connection.rb
Constant Summary collapse
- STAGE_LENGTH =
0
- STAGE_BODY =
1
- MAX_BODY_LENGTH =
9999999
Instance Attribute Summary collapse
-
#server ⇒ Object
Returns the value of attribute server.
Instance Method Summary collapse
- #build_simple_command(type, message) ⇒ Object
- #post_init ⇒ Object
- #process_add_task_command(command) ⇒ Object
- #process_add_tasks_command(command) ⇒ Object
- #process_command(command) ⇒ Object
- #process_data(data) ⇒ Object
- #process_get_status_command(command) ⇒ Object
- #process_stats_command(command) ⇒ Object
- #receive_data(data) ⇒ Object
- #send(data) ⇒ Object
- #send_error(message) ⇒ Object
- #send_result(command) ⇒ Object
Instance Attribute Details
#server ⇒ Object
Returns the value of attribute server.
6 7 8 |
# File 'lib/background_queue/server_lib/event_connection.rb', line 6 def server @server end |
Instance Method Details
#build_simple_command(type, message) ⇒ Object
65 66 67 |
# File 'lib/background_queue/server_lib/event_connection.rb', line 65 def build_simple_command(type, ) BackgroundQueue::Command.new(type, {}, {:message=>}) end |
#post_init ⇒ Object
14 15 16 17 18 |
# File 'lib/background_queue/server_lib/event_connection.rb', line 14 def post_init @data = "" @length = 0 @stage = STAGE_LENGTH end |
#process_add_task_command(command) ⇒ Object
91 92 93 94 95 96 97 |
# File 'lib/background_queue/server_lib/event_connection.rb', line 91 def process_add_task_command(command) @server.logger.debug("add_task: #{command.args[:owner_id]}, #{command.args[:job_id]}, #{command.args[:task_id]}") task = BackgroundQueue::ServerLib::Task.new(command.args[:owner_id], command.args[:job_id], command.args[:task_id], command.args[:priority], command.args[:worker], command.args[:params], command.) server.task_queue.add_task(task) @server.change_stat(:tasks, 1) build_simple_command(:result, "ok") end |
#process_add_tasks_command(command) ⇒ Object
99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 |
# File 'lib/background_queue/server_lib/event_connection.rb', line 99 def process_add_tasks_command(command) @server.logger.debug("add_tasks: #{command.args[:owner_id]}, #{command.args[:job_id]}, #{command.args[:tasks].inspect}") shared_params = command.args[:shared_parameters] shared_params = {} if shared_params.nil? owner_id = command.args[:owner_id] job_id = command.args[:job_id] priority = command.args[:priority] worker = command.args[:worker] for task_data in command.args[:tasks] if task_data[1].nil? merged_params = shared_params = command. else merged_params = shared_params.clone.update(task_data[1]) if task_data[2].nil? = command. else = command..merge(task_data[2]) end end task = BackgroundQueue::ServerLib::Task.new(owner_id, job_id, task_data[0], priority, worker, merged_params, ) server.task_queue.add_task(task) end @server.change_stat(:tasks, command.args[:tasks].length) build_simple_command(:result, "ok") end |
#process_command(command) ⇒ Object
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/background_queue/server_lib/event_connection.rb', line 74 def process_command(command) case command.code.to_s when 'add_task' process_add_task_command(command) when 'add_tasks' process_add_tasks_command(command) when 'remove_tasks' process_remove_tasks_command(command) when 'get_status' process_get_status_command(command) when 'stats' process_stats_command(command) else raise "Unknown command: #{command.code.inspect}" end end |
#process_data(data) ⇒ Object
45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/background_queue/server_lib/event_connection.rb', line 45 def process_data(data) begin cmd = BackgroundQueue::Command.from_buf(data) result = process_command(cmd) send_result(result) rescue Exception=>e @server.logger.error("Error processing command: #{e.}") @server.logger.debug(e.backtrace.join("\n")) send_error(e.) end end |
#process_get_status_command(command) ⇒ Object
126 127 128 129 130 131 132 133 |
# File 'lib/background_queue/server_lib/event_connection.rb', line 126 def process_get_status_command(command) job = @server.jobs.get_job(command.args[:job_id]) if job.nil? build_simple_command(:job_not_found, "job #{command.args[:job_id]} not found") else BackgroundQueue::Command.new(:status, {}, job.get_current_progress) end end |
#process_stats_command(command) ⇒ Object
135 136 137 |
# File 'lib/background_queue/server_lib/event_connection.rb', line 135 def process_stats_command(command) BackgroundQueue::Command.new(:stats, {}, @server.get_stats) end |
#receive_data(data) ⇒ Object
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/background_queue/server_lib/event_connection.rb', line 20 def receive_data(data) @data << data if @stage == STAGE_LENGTH if @data.length >= 6 s_header = @data.slice!(0,6) version, length = s_header.unpack("SL") if version == 1 @length = length @stage = STAGE_BODY if length > MAX_BODY_LENGTH || length <= 0 raise "Invalid length: #{length}" end else raise "Invalid header version: #{version}" end end end if @stage == STAGE_BODY && @data.length == @length #body received process_data(@data) end end |
#send(data) ⇒ Object
69 70 71 72 |
# File 'lib/background_queue/server_lib/event_connection.rb', line 69 def send(data) data_with_header = [1, data.length, data].pack("SLZ#{data.length}") send_data(data_with_header) end |
#send_error(message) ⇒ Object
61 62 63 |
# File 'lib/background_queue/server_lib/event_connection.rb', line 61 def send_error() send_result(build_simple_command(:error, )) end |
#send_result(command) ⇒ Object
57 58 59 |
# File 'lib/background_queue/server_lib/event_connection.rb', line 57 def send_result(command) send(command.to_buf) end |