Class: Faye::Client
- Inherits:
-
Object
- Object
- Faye::Client
- Includes:
- Deferrable, Extensible, Logging, Publisher
- Defined in:
- lib/faye/protocol/client.rb
Constant Summary collapse
- UNCONNECTED =
1
- CONNECTING =
2
- CONNECTED =
3
- DISCONNECTED =
4
- HANDSHAKE =
'handshake'
- RETRY =
'retry'
- NONE =
'none'
- CONNECTION_TIMEOUT =
60.0
- DEFAULT_RETRY =
5.0
- MAX_REQUEST_SIZE =
2048
Constants included from Logging
Instance Attribute Summary collapse
-
#cookies ⇒ Object
readonly
Returns the value of attribute cookies.
-
#endpoint ⇒ Object
readonly
Returns the value of attribute endpoint.
-
#endpoints ⇒ Object
readonly
Returns the value of attribute endpoints.
-
#headers ⇒ Object
readonly
Returns the value of attribute headers.
-
#max_request_size ⇒ Object
readonly
Returns the value of attribute max_request_size.
-
#retry ⇒ Object
readonly
Returns the value of attribute retry.
-
#transports ⇒ Object
readonly
Returns the value of attribute transports.
Instance Method Summary collapse
-
#connect(&block) ⇒ Object
Request Response MUST include: * channel MUST include: * channel * clientId * successful * connectionType * clientId MAY include: * ext MAY include: * error * id * advice * ext * id * timestamp.
- #disable(feature) ⇒ Object
-
#disconnect ⇒ Object
Request Response MUST include: * channel MUST include: * channel * clientId * successful MAY include: * ext * clientId * id MAY include: * error * ext * id.
-
#handshake(&block) ⇒ Object
Request MUST include: * channel * version * supportedConnectionTypes MAY include: * minimumVersion * ext * id.
-
#initialize(endpoint = nil, options = {}) ⇒ Client
constructor
A new instance of Client.
- #message_error(messages, immediate = false) ⇒ Object
-
#publish(channel, data) ⇒ Object
Request Response MUST include: * channel MUST include: * channel * data * successful MAY include: * clientId MAY include: * id * id * error * ext * ext.
- #receive_message(message) ⇒ Object
- #set_header(name, value) ⇒ Object
-
#subscribe(channel, force = false, &block) ⇒ Object
Request Response MUST include: * channel MUST include: * channel * clientId * successful * subscription * clientId MAY include: * ext * subscription * id MAY include: * error * advice * ext * id * timestamp.
-
#unsubscribe(channel, &block) ⇒ Object
Request Response MUST include: * channel MUST include: * channel * clientId * successful * subscription * clientId MAY include: * ext * subscription * id MAY include: * error * advice * ext * id * timestamp.
Methods included from Extensible
#add_extension, #pipe_through_extensions, #remove_extension
Methods included from Publisher
Methods included from Deferrable
Constructor Details
#initialize(endpoint = nil, options = {}) ⇒ Client
Returns a new instance of Client.
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/faye/protocol/client.rb', line 24 def initialize(endpoint = nil, = {}) super() info('New client created for ?', endpoint) @options = @endpoint = Faye.parse_url(endpoint || RackAdapter::DEFAULT_ENDPOINT) @endpoints = @options[:endpoints] || {} @transports = {} @cookies = CookieJar::Jar.new @headers = {} @disabled = [] @retry = @options[:retry] || DEFAULT_RETRY @endpoints.each do |key, value| @endpoints[key] = Faye.parse_url(value) end @max_request_size = MAX_REQUEST_SIZE @state = UNCONNECTED @channels = Channel::Set.new @message_id = 0 @response_callbacks = {} @advice = { 'reconnect' => RETRY, 'interval' => 1000.0 * (@options[:interval] || Engine::INTERVAL), 'timeout' => 1000.0 * (@options[:timeout] || CONNECTION_TIMEOUT) } end |
Instance Attribute Details
#cookies ⇒ Object (readonly)
Returns the value of attribute cookies.
22 23 24 |
# File 'lib/faye/protocol/client.rb', line 22 def @cookies end |
#endpoint ⇒ Object (readonly)
Returns the value of attribute endpoint.
22 23 24 |
# File 'lib/faye/protocol/client.rb', line 22 def endpoint @endpoint end |
#endpoints ⇒ Object (readonly)
Returns the value of attribute endpoints.
22 23 24 |
# File 'lib/faye/protocol/client.rb', line 22 def endpoints @endpoints end |
#headers ⇒ Object (readonly)
Returns the value of attribute headers.
22 23 24 |
# File 'lib/faye/protocol/client.rb', line 22 def headers @headers end |
#max_request_size ⇒ Object (readonly)
Returns the value of attribute max_request_size.
22 23 24 |
# File 'lib/faye/protocol/client.rb', line 22 def max_request_size @max_request_size end |
#retry ⇒ Object (readonly)
Returns the value of attribute retry.
22 23 24 |
# File 'lib/faye/protocol/client.rb', line 22 def retry @retry end |
#transports ⇒ Object (readonly)
Returns the value of attribute transports.
22 23 24 |
# File 'lib/faye/protocol/client.rb', line 22 def transports @transports end |
Instance Method Details
#connect(&block) ⇒ Object
Request Response MUST include: * channel MUST include: * channel
* clientId * successful
* connectionType * clientId
MAY include: * ext MAY include: * error
* id * advice
* ext
* id
* timestamp
127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 |
# File 'lib/faye/protocol/client.rb', line 127 def connect(&block) return if @advice['reconnect'] == NONE or @state == DISCONNECTED return handshake { connect(&block) } if @state == UNCONNECTED callback(&block) return unless @state == CONNECTED info('Calling deferred actions for ?', @client_id) set_deferred_status(:succeeded) set_deferred_status(:unknown) return unless @connect_request.nil? @connect_request = true info('Initiating connection for ?', @client_id) send({ 'channel' => Channel::CONNECT, 'clientId' => @client_id, 'connectionType' => @transport.connection_type }) do cycle_connection end end |
#disable(feature) ⇒ Object
56 57 58 |
# File 'lib/faye/protocol/client.rb', line 56 def disable(feature) @disabled << feature end |
#disconnect ⇒ Object
Request Response MUST include: * channel MUST include: * channel
* clientId * successful
MAY include: * ext * clientId
* id MAY include: * error
* ext
* id
162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 |
# File 'lib/faye/protocol/client.rb', line 162 def disconnect return unless @state == CONNECTED @state = DISCONNECTED info('Disconnecting ?', @client_id) send({ 'channel' => Channel::DISCONNECT, 'clientId' => @client_id }) do |response| @transport.close if response['successful'] end info('Clearing channel listeners for ?', @client_id) @channels = Channel::Set.new end |
#handshake(&block) ⇒ Object
Request MUST include: * channel
* version
* supportedConnectionTypes
MAY include: * minimumVersion
* ext
* id
Success Response Failed Response MUST include: * channel MUST include: * channel
* version * successful
* supportedConnectionTypes * error
* clientId MAY include: * supportedConnectionTypes
* successful * advice
MAY include: * minimumVersion * version
* advice * minimumVersion
* ext * ext
* id * id
* authSuccessful
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 |
# File 'lib/faye/protocol/client.rb', line 83 def handshake(&block) return if @advice['reconnect'] == NONE return if @state != UNCONNECTED @state = CONNECTING info('Initiating handshake with ?', @endpoint) select_transport(MANDATORY_CONNECTION_TYPES) send({ 'channel' => Channel::HANDSHAKE, 'version' => BAYEUX_VERSION, 'supportedConnectionTypes' => [@transport.connection_type] }) do |response| if response['successful'] @state = CONNECTED @client_id = response['clientId'] select_transport(response['supportedConnectionTypes']) info('Handshake successful: ?', @client_id) subscribe(@channels.keys, true) block.call if block_given? else info('Handshake unsuccessful') EventMachine.add_timer(@advice['interval'] / 1000.0) { handshake(&block) } @state = UNCONNECTED end end end |
#message_error(messages, immediate = false) ⇒ Object
310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 |
# File 'lib/faye/protocol/client.rb', line 310 def (, immediate = false) .each do || id = ['id'] if immediate transport_send() else EventMachine.add_timer(@retry) { transport_send() } end end return if immediate or @transport_up == false @transport_up = false trigger('transport:down') end |
#publish(channel, data) ⇒ Object
Request Response MUST include: * channel MUST include: * channel
* data * successful
MAY include: * clientId MAY include: * id
* id * error
* ext * ext
268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 |
# File 'lib/faye/protocol/client.rb', line 268 def publish(channel, data) publication = Publication.new connect { info('Client ? queueing published message to ?: ?', @client_id, channel, data) send({ 'channel' => channel, 'data' => data, 'clientId' => @client_id }) do |response| if response['successful'] publication.set_deferred_status(:succeeded) else publication.set_deferred_status(:failed, Error.parse(response['error'])) end end } publication end |
#receive_message(message) ⇒ Object
289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 |
# File 'lib/faye/protocol/client.rb', line 289 def () id = ['id'] if .has_key?('successful') callback = @response_callbacks.delete(id) end pipe_through_extensions(:incoming, , nil) do || next unless handle_advice(['advice']) if ['advice'] () callback.call() if callback end return if @transport_up == true @transport_up = true trigger('transport:up') end |
#set_header(name, value) ⇒ Object
60 61 62 |
# File 'lib/faye/protocol/client.rb', line 60 def set_header(name, value) @headers[name.to_s] = value.to_s end |
#subscribe(channel, force = false, &block) ⇒ Object
Request Response MUST include: * channel MUST include: * channel
* clientId * successful
* subscription * clientId
MAY include: * ext * subscription
* id MAY include: * error
* advice
* ext
* id
* timestamp
190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 |
# File 'lib/faye/protocol/client.rb', line 190 def subscribe(channel, force = false, &block) if Array === channel return channel.map { |c| subscribe(c, force, &block) } end subscription = Subscription.new(self, channel, block) has_subscribe = @channels.has_subscription?(channel) if has_subscribe and not force @channels.subscribe([channel], block) subscription.set_deferred_status(:succeeded) return subscription end connect { info('Client ? attempting to subscribe to ?', @client_id, channel) @channels.subscribe([channel], block) unless force send({ 'channel' => Channel::SUBSCRIBE, 'clientId' => @client_id, 'subscription' => channel }) do |response| unless response['successful'] subscription.set_deferred_status(:failed, Error.parse(response['error'])) next @channels.unsubscribe(channel, block) end channels = [response['subscription']].flatten info('Subscription acknowledged for ? to ?', @client_id, channels) subscription.set_deferred_status(:succeeded) end } subscription end |
#unsubscribe(channel, &block) ⇒ Object
Request Response MUST include: * channel MUST include: * channel
* clientId * successful
* subscription * clientId
MAY include: * ext * subscription
* id MAY include: * error
* advice
* ext
* id
* timestamp
237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 |
# File 'lib/faye/protocol/client.rb', line 237 def unsubscribe(channel, &block) if Array === channel return channel.map { |c| unsubscribe(c, &block) } end dead = @channels.unsubscribe(channel, block) return unless dead connect { info('Client ? attempting to unsubscribe from ?', @client_id, channel) send({ 'channel' => Channel::UNSUBSCRIBE, 'clientId' => @client_id, 'subscription' => channel }) do |response| next unless response['successful'] channels = [response['subscription']].flatten info('Unsubscription acknowledged for ? from ?', @client_id, channels) end } end |