Class: JanusGateway::Transport::WebSocket
- Inherits:
-
JanusGateway::Transport
- Object
- JanusGateway::Transport
- JanusGateway::Transport::WebSocket
- Defined in:
- lib/janus_gateway/transport/websocket.rb
Instance Attribute Summary collapse
- #client ⇒ Faye::WebSocket::Client, NilClass readonly
-
#transaction_queue ⇒ Object
readonly
Returns the value of attribute transaction_queue.
Instance Method Summary collapse
- #connect ⇒ Object
- #connected? ⇒ TrueClass, FalseClass
- #disconnect ⇒ Object
-
#initialize(url, protocol = 'janus-protocol') ⇒ WebSocket
constructor
A new instance of WebSocket.
- #run ⇒ Object
- #send(data) ⇒ Object
- #send_transaction(data) ⇒ Concurrent::Promise
Methods inherited from JanusGateway::Transport
Constructor Details
#initialize(url, protocol = 'janus-protocol') ⇒ WebSocket
Returns a new instance of WebSocket.
10 11 12 13 14 15 |
# File 'lib/janus_gateway/transport/websocket.rb', line 10 def initialize(url, protocol = 'janus-protocol') @url = url @protocol = protocol @client = nil @transaction_queue = {} end |
Instance Attribute Details
#client ⇒ Faye::WebSocket::Client, NilClass (readonly)
110 111 112 |
# File 'lib/janus_gateway/transport/websocket.rb', line 110 def client @client end |
#transaction_queue ⇒ Object (readonly)
Returns the value of attribute transaction_queue.
6 7 8 |
# File 'lib/janus_gateway/transport/websocket.rb', line 6 def transaction_queue @transaction_queue end |
Instance Method Details
#connect ⇒ Object
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/janus_gateway/transport/websocket.rb', line 24 def connect raise('WebSocket client already exists!') unless @client.nil? @client = _create_client(@url, @protocol) client.on :open do emit :open end client.on :message do |event| data = JSON.parse(event.data) transaction_list = @transaction_queue.clone transaction_id = data['transaction'] unless transaction_id.nil? promise = transaction_list[transaction_id] unless promise.nil? if %w(success ack).include?(data['janus']) promise.set(data).execute else error_data = data['error'] error = JanusGateway::Error.new(error_data['code'], error_data['reason']) promise.fail(error).execute end end end emit :message, data end client.on :close do emit :close @transaction_queue.each do |transaction_id, promise| error = JanusGateway::Error.new(0, "Transaction id `#{transaction_id}` has failed due to websocket `close`!") promise.fail(error).execute end end end |
#connected? ⇒ TrueClass, FalseClass
105 106 107 |
# File 'lib/janus_gateway/transport/websocket.rb', line 105 def connected? !client.nil? && (client.ready_state == Faye::WebSocket::API::OPEN) end |
#disconnect ⇒ Object
100 101 102 |
# File 'lib/janus_gateway/transport/websocket.rb', line 100 def disconnect client.close unless client.nil? end |
#run ⇒ Object
17 18 19 20 21 22 |
# File 'lib/janus_gateway/transport/websocket.rb', line 17 def run EventMachine.run do EM.error_handler { |e| raise(e) } connect end end |
#send(data) ⇒ Object
66 67 68 69 |
# File 'lib/janus_gateway/transport/websocket.rb', line 66 def send(data) raise 'WebSocket transport not connected' unless connected? client.send(JSON.generate(data)) end |
#send_transaction(data) ⇒ Concurrent::Promise
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
# File 'lib/janus_gateway/transport/websocket.rb', line 73 def send_transaction(data) promise = Concurrent::Promise.new transaction = transaction_id_new data[:transaction] = transaction send(data) @transaction_queue[transaction] = promise thread = Thread.new do sleep(_transaction_timeout) error = JanusGateway::Error.new(0, "Transaction id `#{transaction}` has failed due to `timeout`!") promise.fail(error).execute end promise.then do @transaction_queue.delete(transaction) thread.exit end promise.rescue do @transaction_queue.delete(transaction) thread.exit end promise end |