Class: Pmux::Gateway::HttpHandler

Inherits:
EM::Connection
  • Object
show all
Includes:
EM::HttpServer
Defined in:
lib/pmux-gw/http_handler.rb

Constant Summary collapse

@@static_resources =
[ "/css", "/js", "/img" ]
@@param_attr =
[
  { "name" => "mapper",       "multi" => false, "required" => true,  "gwoptonly" => false, "default" => nil },
  { "name" => "ship-file",    "multi" => true,  "required" => false, "gwoptonly" => false, "default" => nil },
  { "name" => "file",         "multi" => true,  "required" => false, "gwoptonly" => false, "default" => nil },
  { "name" => "file-glob",    "multi" => true,  "required" => false, "gwoptonly" => false, "default" => nil },
  { "name" => "reducer",      "multi" => false, "required" => false, "gwoptonly" => false, "default" => nil },
  { "name" => "num-r",        "multi" => false, "required" => false, "gwoptonly" => false, "default" => nil },
  { "name" => "ff",           "multi" => false, "required" => false, "gwoptonly" => false, "default" => nil },
  { "name" => "storage",      "multi" => false, "required" => false, "gwoptonly" => false, "default" => ["glusterfs"] },
  { "name" => "locator-host", "multi" => false, "required" => false, "gwoptonly" => false, "default" => ["127.0.0.1"] },
  { "name" => "locator-port", "multi" => false, "required" => false, "gwoptonly" => false, "default" => nil },
  { "name" => "detect-error", "multi" => false, "required" => false, "gwoptonly" => true,  "default" => nil }
]
@@ext_mimetype_map =
{
  ".js" => "text/javascript",
  ".css" => "text/css",
  ".png" => "text/png"
}
@@date_format =
"%Y/%m/%d"
@@term =
false
@@task_cnt =
0
@@history_template =
nil

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(*args) ⇒ HttpHandler

httpのリクエストを受けて、pmuxとの橋渡しをするクラス



48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/pmux-gw/http_handler.rb', line 48

def initialize *args
  super
  @resource_map = {
    "/pmux" => method(:resource_pmux),
    "/existence" => method(:resource_existence),
    "/history" => method(:resource_history),
    "/task" => method(:resource_task)
  }
  @logger = LoggerWrapper.instance()
  @history = History.instance()
  @auth_user = nil
  @volume_prefix = nil
  @cc = nil
end

Class Method Details

.get_task_cntObject



39
40
41
# File 'lib/pmux-gw/http_handler.rb', line 39

def self.get_task_cnt
   return @@task_cnt
end

.set_termObject



43
44
45
# File 'lib/pmux-gw/http_handler.rb', line 43

def self.set_term
   @@term = true
end

Instance Method Details

#base_response(args) ⇒ Object



193
194
195
196
197
198
199
# File 'lib/pmux-gw/http_handler.rb', line 193

def base_response(args)
  @response.status = args[:status]
  @response.content_type(args[:content_type])
  @response.content = args[:content]
  @response.send_response
  @logger.logging("info", "(response) peer: #{@peer_ip} - #{@peer_port}, path: #{@http_path_info}, response: #{@response.status}")
end

#build_commandObject



141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
# File 'lib/pmux-gw/http_handler.rb', line 141

def build_command
  # パラメータからpmuxコマンドの生成する
  mapper = nil
  opt_list = [$config["pmux_path"]]
  file_list = []
  for attr in @@param_attr
    if !attr["multi"]
      val = get_param_value(attr["name"], attr["default"])
    else
      val = get_param_values(attr["name"], attr["default"])
    end
    if attr["required"]
      if val.nil?
        @logger.logging("info", "patameter error #{attr['name']}")
        return nil, nil, "#{attr['name']} is required"
      else
        if attr["name"] == "mapper"
          mapper = val
        end
      end
    end
    if !val.nil? && !attr["gwoptonly"]
      if !attr["multi"]
        opt_list << "--#{attr['name']}=#{val}"
      elsif attr["name"] == 'file'
        for f in val
          f = [$config["glusterfs_root"], @volume_prefix, f].join(File::SEPARATOR) 
          file_list << f 
        end
      elsif attr["name"] == 'file-glob'
        for fg in val
           fg = [$config["glusterfs_root"], @volume_prefix, fg].join(File::SEPARATOR) 
           file_list << Dir.glob(fg)
        end
      elsif attr["name"] == 'ship-file'
        for f in val
          f = [$config["glusterfs_root"], @volume_prefix, f].join(File::SEPARATOR) 
          opt_list << "--#{attr['name']}=#{f}"
        end
      else
        for v in val
          opt_list << "--#{attr['name']}=#{v}"
        end
      end
    end
  end
  return nil, nil, "file of file-glob is required" if file_list.length < 1
  opt_list << file_list
  @logger.logging("debug", opt_list.join(" "))
  return [ mapper, opt_list.join(" "), nil ]
end

#error_response(args = {:status => 400, :content_type => "text/html", :content => ""}) ⇒ Object



205
206
207
# File 'lib/pmux-gw/http_handler.rb', line 205

def error_response(args = {:status => 400, :content_type => "text/html", :content => ""})
  base_response(args)
end

#get_authObject



95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/pmux-gw/http_handler.rb', line 95

def get_auth
  auth = nil
  if @headers.key?("authorization")
    auth = @headers["authorization"]
  elsif @headers.key?("proxy-authorization")
    auth = @headers["proxy-authorization"]
  end
  return false if auth.nil?
  type, string = auth.split(' ')
  return false if type.downcase() != "basic"
  @auth_user, @auth_pass = Base64.decode64(string).split(':', 2)
  return true
end

#get_param_value(name, default) ⇒ Object



114
115
116
117
118
# File 'lib/pmux-gw/http_handler.rb', line 114

def get_param_value(name, default)
  vals = get_param_values(name, default)
  return vals[0] if !vals.nil?
  return nil
end

#get_param_values(name, default) ⇒ Object



109
110
111
112
# File 'lib/pmux-gw/http_handler.rb', line 109

def get_param_values(name, default)
  return @params[name] if @params.key?(name) && @params[name].length > 0
  return default
end

#get_user_volume_prefixObject



120
121
122
123
124
125
126
# File 'lib/pmux-gw/http_handler.rb', line 120

def get_user_volume_prefix
   if $userdb.key?(@auth_user) && $userdb[@auth_user]["password"] == @auth_pass
       @volume_prefix = $userdb[@auth_user]["volume-prefix"]
       return true
   end
   return false
end

#is_detect_errorObject



128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/pmux-gw/http_handler.rb', line 128

def is_detect_error
  # エラー検出モードかどうかの判定
  if @detect_error.nil? 
    if get_param_value("detect-error", ["off"]) == "on"
      @logger.logging("debug", "mode is decect-error")
      return true
    end
    return false
  else
    return @detect_error
  end 
end

#parse_http_headersObject



87
88
89
90
91
92
93
# File 'lib/pmux-gw/http_handler.rb', line 87

def parse_http_headers
  @headers = {}
  for line in @http_headers.split("\000")
     key, value = line.split(":", 2)
     @headers[key.strip().downcase()] = value.strip()
  end
end

#post_initObject



63
64
65
66
67
# File 'lib/pmux-gw/http_handler.rb', line 63

def post_init
  super
  no_environment_strings
  @logger.logging("debug", "connected to client")
end

#process_http_requestObject



380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
# File 'lib/pmux-gw/http_handler.rb', line 380

def process_http_request
  # responseオブジェクトを作る
  @response = EM::DelegatedHttpResponse.new(self)
  begin
    # httpのリクエストを受けた際の処理
    # tcp keepaliveの設定と、chunkが利用可能かどうかのチェック
    # 所定のリソースハンドラーへルーティングする処理を行う
    set_sock_opt(Socket::SOL_SOCKET, 9, 1) # socket.SO_KEEPALIVE
    set_sock_opt(Socket::SOL_TCP, 4, 1) # socket.TCP_KEEPIDLE
    set_sock_opt(Socket::SOL_TCP, 5, 1) # socket.TCP_KEEPINTVL
    set_sock_opt(Socket::SOL_TCP, 6, 10) # socket.TCP_KEEPCNT
    
    # 終了フラグが立っている場合は503を返す
    if @@term
      error_response({:status => 503,
                      :content_type => "text/plain",
                      :content => "apllication is terminating"}) 
      return
    end
    
    # 接続元情報の取得
    @peer_port, @peer_ip = Socket.unpack_sockaddr_in(get_peername())
    
    # ヘッダ文字列をパースする
    parse_http_headers()
    
    # 認証情報の取得
    get_auth()
    
    # パラメータのデコード
    case @http_request_method
    when "GET"
      @params = @http_query_string.nil? ? Hash.new : CGI.parse(@http_query_string)
    when "POST"
      @params = @http_post_content.nil? ? Hash.new : CGI.parse(@http_post_content)
    else
      error_response({:status => 400,
                      :content_type => "text/plain",
                      :content => "unsupport method #{@http_request_method}"}) 
      return
    end
    
    # 指定されたリソースを処理するメソッドへルーティング
    @logger.logging("info", "(request) peer: #{@peer_ip} - #{@peer_port}, path: #{@http_path_info}, headers: #{@headers.inspect} params: #{@params.inspect}")
    
    # staticなリソースの場合
    for prefix in @@static_resources
        if !@http_path_info.index(prefix).nil?
            resource_static([File.dirname(__FILE__), "static", @http_path_info].join(File::SEPARATOR))
            return
        end
    end     
    
    # そうでない場合
    if @resource_map.key?(@http_path_info)
      @resource_map[@http_path_info].call
    else
      error_response({:status => 404,
                      :content_type => "text/plain",
                      :content => "not found #{@http_path_info} resource"}) 
      return
    end
  rescue Exception => e
    # 予期しない例外は500を返し、ログに残しておく
    @logger.logging("error", "error occurred in http handler: #{e}")
    @logger.logging("error", e.backtrace.join("\n"))
    error_response({:status => 500,
                    :content_type => "text/plain",
                    :content => "error: #{e}"}) 
  end
end

#resource_existenceObject



289
290
291
292
293
294
# File 'lib/pmux-gw/http_handler.rb', line 289

def resource_existence
  # リソース/existenceが呼ばれた際の処理
  success_response({:status => 204,
                    :content_type => "text/html",
                    :content => ""}) 
end

#resource_historyObject



296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
# File 'lib/pmux-gw/http_handler.rb', line 296

def resource_history
  default_start_date = default_end_date = Date.today()
  client = get_param_value("client", [""])
  pid = get_param_value("pid", [""])
  mapper = get_param_value("mapper", [""])
  start_date_str = get_param_value("start-date", [default_start_date.strftime(@@date_format)])
  end_date_str = get_param_value("end-date", [default_end_date.strftime(@@date_format)])
  begin
    start_date = Date.strptime(start_date_str, @@date_format)
    end_date = Date.strptime(end_date_str, @@date_format)
    # end dateの方が過去の時間を指定されたら、start dateとend dateを自動的に入れ替える
    if (end_date - start_date).to_i < 0
      tmp_date = start_date
      start_date = end_date
      end_date = tmp_date
    end
  rescue ArgumentError
    # start dateとend dateがパースできない場合はデフォルトにする
    logger.logging("info", "can not parse date format")
    start_date = default_start_date
    end_date = default_end_date
  end
  history, history_id_order = @history.load(client, pid, mapper, "", start_date, end_date, false, true, true)
  labels = ["date", "client", "user", "pid", "mapper", "start", "end", "elapsed", "status", "detail" ]
  # templateがまだ読み込まれてなければtemplateを読み込む
  if @@history_template.nil?
    begin 
      template_file_path = [File.dirname(__FILE__), "template", "history.tmpl"].join(File::SEPARATOR) 
      @@template = File.read(template_file_path)
    rescue Exception => e
      @logger.logging("error", "can not load template file (#{template_file_path}) : #{e}")
      @logger.logging("error", e.backtrace.join("\n"))
      success_response({:status => 404,
                        :content_type => "text/html",
                        :content => "can not load template file (#{template_file_path}) : #{e}"}) 
      return
    end
  end
  content = ERB.new(@@template, nil, '-').result(binding)
  success_response({:status => 200,
                    :content_type => "text/html",
                    :content => content}) 
end

#resource_pmuxObject



236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
# File 'lib/pmux-gw/http_handler.rb', line 236

def resource_pmux
  # task数が閾値を超えていた場合はbusyを返す
  if @@task_cnt >= $config["max_tasks"]
    error_response({:status => 503,
                    :content_type => "text/plain",
                    :content => "number of tasks exceeded the limit"}) 
    return
  end
    
  # 認証処理
  if $config["use_basic_auth"]
    if @auth_user.nil?
      error_response({:status => 401,
                      :content_type => "text/plain",
                      :content => "need user authentication"}) 
      return
    end
    if !get_user_volume_prefix()
      error_response({:status => 403,
                      :content_type => "text/plain",
                      :content => "user authentication failure"}) 
      return
    end
  end
  @volume_prefix = "" if @volume_prefix.nil?
    
  # リソース/pmuxが呼ばれた際の処理
  if @http_protocol == "HTTP/1.0" && !is_detect_error()
    error_response({:status => 400,
                    :content_type => "text/plain",
                    :content => "HTTP/1.0 is not support chunked"}) 
    return
  end
  mapper, cmd, err = build_command()
  if mapper.nil? || cmd.nil?
    error_response({:status => 400,
                    :content_type => "text/plain",
                    :content => err})
    return
  end
  @cc = ClientContext.new(self, @response, mapper, cmd, is_detect_error())
  #ステータス情報を保存しておく
  @@task_cnt += 1
  @cc.status = "runnning"
  @cc.start_datetime = DateTime.now().new_offset(Rational(9, 24))
  @cc.peername = "#{@peer_ip} - #{@peer_port}"
  @cc.user = @auth_user
  @cc.set_pmux_handler(EMPessimistic.popen3(cmd, PmuxHandler, @cc))
  @cc.pid = @cc.pmux_handler.get_status.pid
  #ヒストリー情報を更新する
  @history.save(@cc)
end

#resource_static(path) ⇒ Object



209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
# File 'lib/pmux-gw/http_handler.rb', line 209

def resource_static path
  @logger.logging("debug", "requested static resource (#{path})")
  ext = File.extname(path)
  if !@@ext_mimetype_map.key?(ext)
    error_response({:status => 404,
                    :content_type => "text/plain",
                    :content => "not found #{@http_path_info} resource"}) 
    return
  end
  f = nil
  begin
    f = open(path)
    success_response({:status => 200,
                      :content_type => @@ext_mimetype_map[ext],
                      :content => f.read}) 
  rescue Exception
    error_response({:status => 404,
                    :content_type => "text/plain",
                    :content => "not found #{@http_path_info} resource"}) 
  ensure
    if !f.nil?
      f.close()
    end
  end
  return
end

#resource_taskObject



340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
# File 'lib/pmux-gw/http_handler.rb', line 340

def resource_task
  client = get_param_value("client", [""])
  pid = get_param_value("pid", [""])
  mapper = get_param_value("mapper", [""])
  start_datetime = get_param_value("start-datetime", [""])
  date_str = get_param_value("date", [""])
  if client == "" || pid == "" || mapper == "" || start_datetime == "" || date_str == ""
    error_response({:status => 400,
                    :content_type => "text/plain",
                    :content => "client and pid and mapper and start-datetime and date is required"})
    return
  end
  begin
    start_date = Date.parse(date_str)
    end_date = Date.parse(date_str)
  rescue ArgumentError
    error_response({:status => 400,
                    :content_type => "text/plain",
                    :content => "invalid date string"})
    return
  end
  history, history_id_order = @history.load(client, pid, mapper, start_datetime, start_date, end_date, true, false, false)
  if history_id_order.length < 1
    error_response({:status => 404,
                    :content_type => "text/plain",
                    :content => "not found task"})
    return
  end
  elems = history[history_id_order[0]]
  date = elems.shift()
  labels = ["client", "user", "pid", "mapper", "start", "end", "elapsed", "status", "command"]
  content = ""
  for i in 0..elems.length - 1 do
     content += "#{labels[i]}\t#{elems[i]}\n";
  end
  success_response({:status => 200,
                    :content_type => "text/plain",
                    :content => content}) 
end

#success_response(args = {:status => 200, :content_type => "text/html", :content => ""}) ⇒ Object



201
202
203
# File 'lib/pmux-gw/http_handler.rb', line 201

def success_response(args = {:status => 200, :content_type => "text/html", :content => ""})
  base_response(args)
end

#unbindObject



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/pmux-gw/http_handler.rb', line 69

def unbind
  super
  @logger.logging("debug", "client is closed")
  if !@cc.nil?
    @@task_cnt -= 1
    @cc.end_datetime = DateTime.now().new_offset(Rational(9, 24))
    # pmuxが終わっていないのにクライアントとの接続が切れたらpmuxを終了する
    if !@cc.pmux_terminated
      @cc.pmux_handler.close_connection() if !@cc.pmux_handler.nil?
      @cc.force_pmux_terminated = true
      @cc.status = "disconnected"
    else
      @cc.status = "done"
    end
    @history.save(@cc)
  end
end