Class: Faye::EventSource

Inherits:
Object
  • Object
show all
Includes:
WebSocket::API::EventTarget
Defined in:
lib/faye/eventsource.rb

Defined Under Namespace

Classes: Stream

Constant Summary collapse

DEFAULT_RETRY =
5

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from WebSocket::API::EventTarget

#add_event_listener, #add_listener, #dispatch_event, #remove_event_listener

Constructor Details

#initialize(env, options = {}) ⇒ EventSource

Returns a new instance of EventSource.


21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/faye/eventsource.rb', line 21

def initialize(env, options = {})
  WebSocket.ensure_reactor_running
  super()

  @env    = env
  @ping   = options[:ping]
  @retry  = (options[:retry] || DEFAULT_RETRY).to_f
  @url    = EventSource.determine_url(env)
  @stream = Stream.new(self)

  @ready_state = WebSocket::API::CONNECTING

  headers = ::WebSocket::Driver::Headers.new
  if options[:headers]
    options[:headers].each { |k,v| headers[k] = v }
  end

  if callback = @env['async.callback']
    callback.call([101, {}, @stream])
  end

  @stream.write("HTTP/1.1 200 OK\r\n" +
                "Content-Type: text/event-stream\r\n" +
                "Cache-Control: no-cache, no-store\r\n" +
                "Connection: close\r\n" +
                headers.to_s +
                "\r\n" +
                "retry: #{ (@retry * 1000).floor }\r\n\r\n")

  EventMachine.next_tick { open }

  if @ping
    @ping_timer = EventMachine.add_periodic_timer(@ping) { ping }
  end
end

Instance Attribute Details

#envObject (readonly)

Returns the value of attribute env


7
8
9
# File 'lib/faye/eventsource.rb', line 7

def env
  @env
end

#ready_stateObject (readonly)

Returns the value of attribute ready_state


7
8
9
# File 'lib/faye/eventsource.rb', line 7

def ready_state
  @ready_state
end

#urlObject (readonly)

Returns the value of attribute url


7
8
9
# File 'lib/faye/eventsource.rb', line 7

def url
  @url
end

Class Method Details

.determine_url(env) ⇒ Object


17
18
19
# File 'lib/faye/eventsource.rb', line 17

def self.determine_url(env)
  WebSocket.determine_url(env, ['https', 'http'])
end

.eventsource?(env) ⇒ Boolean

Returns:

  • (Boolean)

11
12
13
14
15
# File 'lib/faye/eventsource.rb', line 11

def self.eventsource?(env)
  return false unless env['REQUEST_METHOD'] == 'GET'
  accept = (env['HTTP_ACCEPT'] || '').split(/\s*,\s*/)
  accept.include?('text/event-stream')
end

Instance Method Details

#closeObject


100
101
102
103
104
105
106
107
108
109
110
# File 'lib/faye/eventsource.rb', line 100

def close
  return if [WebSocket::API::CLOSING, WebSocket::API::CLOSED].include?(@ready_state)

  @ready_state = WebSocket::API::CLOSED
  EventMachine.cancel_timer(@ping_timer)
  @stream.close_connection_after_writing

  event = WebSocket::API::Event.create('close')
  event.init_event('close', false, false)
  dispatch_event(event)
end

#last_event_idObject


57
58
59
# File 'lib/faye/eventsource.rb', line 57

def last_event_id
  @env['HTTP_LAST_EVENT_ID'] || ''
end

#ping(message = nil) ⇒ Object


94
95
96
97
98
# File 'lib/faye/eventsource.rb', line 94

def ping(message = nil)
  return false if @ready_state > WebSocket::API::OPEN
  @stream.write(":\r\n\r\n")
  true
end

#rack_responseObject


61
62
63
# File 'lib/faye/eventsource.rb', line 61

def rack_response
  [ -1, {}, [] ]
end

#send(message, options = {}) ⇒ Object


79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/faye/eventsource.rb', line 79

def send(message, options = {})
  return false if @ready_state > WebSocket::API::OPEN

  message = ::WebSocket::Driver.encode(message.to_s).
            gsub(/(\r\n|\r|\n)/, '\1data: ')

  frame  = ""
  frame << "event: #{options[:event]}\r\n" if options[:event]
  frame << "id: #{options[:id]}\r\n" if options[:id]
  frame << "data: #{message}\r\n\r\n"

  @stream.write(frame)
  true
end