Class: JanusGateway::Transport::Http

Inherits:
JanusGateway::Transport show all
Defined in:
lib/janus_gateway/transport/http.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from JanusGateway::Transport

#connect, #connected?, #disconnect, #transaction_id_new

Constructor Details

#initialize(url) ⇒ Http

Returns a new instance of Http.

Parameters:

  • url (String)


9
10
11
12
# File 'lib/janus_gateway/transport/http.rb', line 9

def initialize(url)
  @url = url
  @transaction_queue = {}
end

Instance Attribute Details

#transaction_queueObject (readonly)

Returns the value of attribute transaction_queue.



6
7
8
# File 'lib/janus_gateway/transport/http.rb', line 6

def transaction_queue
  @transaction_queue
end

Instance Method Details

#runObject



14
15
16
17
18
19
# File 'lib/janus_gateway/transport/http.rb', line 14

def run
  EventMachine.run do
    EM.error_handler { |e| raise(e) }
    # will be used for long-pooling. currently does nothing
  end
end

#send(data) ⇒ Object

Parameters:

  • data (Hash)


22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/janus_gateway/transport/http.rb', line 22

def send(data)
  sender = _send(data)

  sender.then do |response|
    response_transaction_id = response['transaction']

    transaction_list = @transaction_queue.clone
    unless response_transaction_id.nil?
      promise = transaction_list[response_transaction_id]
      unless promise.nil?
        if %w(success).include?(response['janus'])
          promise.set(response).execute
        elsif %w(ack event).include?(response['janus'])
          # do nothing for now
        else
          error_data = response['error']
          error = JanusGateway::Error.new(error_data['code'], error_data['reason'])
          promise.fail(error).execute
        end
      end
    end
  end

  sender.rescue do |error|
    request_transaction_id = data[:transaction]

    transaction_list = @transaction_queue.clone
    unless request_transaction_id.nil?
      promise = transaction_list[request_transaction_id]
      unless promise.nil?
        error = JanusGateway::Error.new(0, "HTTP/Transport response: `#{error}`")
        promise.fail(error).execute
      end
    end
  end
end

#send_transaction(data) ⇒ Concurrent::Promise

Parameters:

  • data (Hash)

Returns:

  • (Concurrent::Promise)


61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/janus_gateway/transport/http.rb', line 61

def send_transaction(data)
  promise = Concurrent::Promise.new
  transaction = transaction_id_new

  data[:transaction] = transaction

  @transaction_queue[transaction] = promise
  send(data)

  thread = Thread.new do
    sleep(_transaction_timeout)
    error = JanusGateway::Error.new(0, "Transaction id `#{transaction}` has failed due to timeout!")
    promise.fail(error).execute
  end

  promise.then do
    @transaction_queue.remove(transaction)
    thread.exit
  end
  promise.rescue do
    @transaction_queue.remove(transaction)
    thread.exit
  end

  promise
end