Class: Deepstream::Client

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Defined in:
lib/deepstream/client.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(url, options = {}) ⇒ Client

Returns a new instance of Client.



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/deepstream/client.rb', line 26

def initialize(url, options = {})
  @url = Helpers.url(url)
  @error_handler = ErrorHandler.new(self)
  @record_handler = RecordHandler.new(self)
  @event_handler = EventHandler.new(self)
  @options = Helpers.default_options.merge!(options)
  @message_buffer = []
  @last_hearbeat = nil
  @challenge_denied, @@deliberate_close = false
  @state = CONNECTION_STATE::CLOSED
  @verbose = @options[:verbose]
  @log = Async.logger
  @never_connected_before = true
  connect
end

Instance Attribute Details

#optionsObject (readonly)

Returns the value of attribute options.



18
19
20
# File 'lib/deepstream/client.rb', line 18

def options
  @options
end

#stateObject (readonly)

Returns the value of attribute state.



18
19
20
# File 'lib/deepstream/client.rb', line 18

def state
  @state
end

Instance Method Details

#closeObject



77
78
79
80
81
82
83
# File 'lib/deepstream/client.rb', line 77

def close
  return unless connected?
  @deliberate_close = true
  log.info 'deliberate closing' if @verbose
rescue => e
  on_exception(e)
end

#connected?Boolean

Returns:

  • (Boolean)


92
93
94
# File 'lib/deepstream/client.rb', line 92

def connected?
  @state != CONNECTION_STATE::CLOSED
end

#inspectObject



104
105
106
# File 'lib/deepstream/client.rb', line 104

def inspect
  "#{self.class} #{@url} | connection state: #{@state}"
end

#logged_in?Boolean

Returns:

  • (Boolean)


100
101
102
# File 'lib/deepstream/client.rb', line 100

def logged_in?
  @state == CONNECTION_STATE::OPEN
end

#login(credentials = ) ⇒ Object



64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/deepstream/client.rb', line 64

def (credentials = @options[:credentials])
  @options[:credentials] = credentials
  if @challenge_denied
    on_error("this client's connection was closed")
  elsif @state == CONNECTION_STATE::AUTHENTICATING
    send_message(TOPIC::AUTH, ACTION::REQUEST, @options[:credentials].to_json, priority: true)
  end
  self
rescue => e
  on_exception(e)
  self
end

#on_message(data) ⇒ Object



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/deepstream/client.rb', line 47

def on_message(data)
  message = Message.new(data)
  @log.info "Receiving msg = #{message.inspect}" if @verbose
  case message.topic
  when TOPIC::AUTH       then authentication_message(message)
  when TOPIC::CONNECTION then connection_message(message)
  when TOPIC::EVENT      then @event_handler.on_message(message)
  when TOPIC::ERROR      then @error_handler.on_error(message)
  when TOPIC::RECORD     then @record_handler.on_message(message)
  when TOPIC::RPC        then raise(UnknownTopic, 'RPC is currently not implemented.')
  when nil               then nil
  else raise(UnknownTopic, message)
  end
rescue => e
  on_exception(e)
end

#on_openObject



42
43
44
45
# File 'lib/deepstream/client.rb', line 42

def on_open
  @log.info "Websocket connection opened" if @verbose
  @state = CONNECTION_STATE::AWAITING_CONNECTION
end

#reconnectObject



85
86
87
88
89
90
# File 'lib/deepstream/client.rb', line 85

def reconnect
  return if connected?
  @deliberate_close = false
  @state = CONNECTION_STATE::RECONNECTING
  @log.info 'Reconnecting' if @verbose
end

#reconnecting?Boolean

Returns:

  • (Boolean)


96
97
98
# File 'lib/deepstream/client.rb', line 96

def reconnecting?
  @state == CONNECTION_STATE::RECONNECTING
end

#send_message(*args, **kwargs) ⇒ Object



108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/deepstream/client.rb', line 108

def send_message(*args, **kwargs)
  message = Message.parse(*args)
  priority = kwargs[:priority] || false
  timeout = message.topic == TOPIC::EVENT ? kwargs[:timeout] : nil
  message.set_timeout(timeout) if timeout
  return unable_to_send_message(message, priority) if !logged_in? && message.needs_authentication?
  priority ? @message_buffer.unshift(message) : @message_buffer.push(message)
rescue Errno::EPIPE
  unable_to_send_message(message, priority)
rescue => e
  on_exception(e)
end