Class: HTTTee::Server::Api

Inherits:
Object
  • Object
show all
Defined in:
lib/htttee/server/api.rb

Constant Summary collapse

AsyncResponse =
[-1, {}, []].freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(host, port) ⇒ Api

Returns a new instance of Api.



11
12
13
# File 'lib/htttee/server/api.rb', line 11

def initialize(host, port)
  @host, @port = host, port
end

Instance Attribute Details

#redisObject

Returns the value of attribute redis.



9
10
11
# File 'lib/htttee/server/api.rb', line 9

def redis
  @redis
end

Instance Method Details

#async_callback(rack_env) ⇒ Object



166
167
168
# File 'lib/htttee/server/api.rb', line 166

def async_callback(rack_env)
  rack_env['async.callback']
end

#async_close(rack_env) ⇒ Object



170
171
172
# File 'lib/htttee/server/api.rb', line 170

def async_close(rack_env)
  rack_env['async.close']
end

#call(env) ⇒ Object



15
16
17
18
19
20
21
22
23
24
25
# File 'lib/htttee/server/api.rb', line 15

def call(env)
  uuid = env['PATH_INFO'].sub(/^\//, '')
  body = env['rack.response_body']

  case env['REQUEST_METHOD']
  when 'POST' then post(env, uuid, body)
  when 'GET'  then get(env, uuid, body)
  end

  AsyncResponse
end

#channel(uuid) ⇒ Object



158
159
160
# File 'lib/htttee/server/api.rb', line 158

def channel(uuid)
  uuid
end

#closed_stream_response(env, uuid, body) ⇒ Object



99
100
101
102
103
104
# File 'lib/htttee/server/api.rb', line 99

def closed_stream_response(env, uuid, body)
  start_response(env, body)
  stream_data_to(body, data_key(uuid)) do
    body.succeed
  end
end

#data_key(uuid) ⇒ Object



150
151
152
# File 'lib/htttee/server/api.rb', line 150

def data_key(uuid)
  "#{uuid}:data"
end

#encode(*parts) ⇒ Object



212
213
214
# File 'lib/htttee/server/api.rb', line 212

def encode(*parts)
  Yajl::Encoder.encode(parts)
end

#finish(channel) ⇒ Object



178
179
180
# File 'lib/htttee/server/api.rb', line 178

def finish(channel)
  redis.publish channel, encode(FIN)
end

#four_oh_four_response(env, body) ⇒ Object



80
81
82
83
84
# File 'lib/htttee/server/api.rb', line 80

def four_oh_four_response(env, body)
  env['async.callback'].call [404, {'Transfer-Encoding' => 'chunked'}, body]

  body.succeed
end

#get(env, uuid, body) ⇒ Object



35
36
37
38
39
40
41
42
43
# File 'lib/htttee/server/api.rb', line 35

def get(env, uuid, body)
  with_state_for(uuid) do |state|
    case state
    when NilClass   then four_oh_four_response(env, body)
    when STREAMING  then open_stream_response(env, uuid, body)
    when FIN        then closed_stream_response(env, uuid, body)
    end
  end
end

#open_stream_response(env, uuid, body) ⇒ Object



86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/htttee/server/api.rb', line 86

def open_stream_response(env, uuid, body)
  start_response(env, body)
  stream_data_to(body, data_key(uuid)) do
    with_state_for(uuid) do |state|
      if state == FIN
        body.succeed
      else
        subscribe_and_stream(env, uuid, body)
      end
    end
  end
end

#post(env, uuid, body) ⇒ Object



27
28
29
30
31
32
33
# File 'lib/htttee/server/api.rb', line 27

def post(env, uuid, body)
  redis.set(state_key(uuid), STREAMING)

  set_input_callback(env, uuid, body)
  set_input_errback(env, uuid, body)
  set_input_each(env, uuid, body)
end

#publish(channel, data) ⇒ Object



174
175
176
# File 'lib/htttee/server/api.rb', line 174

def publish(channel, data)
  redis.publish channel, encode(STREAMING, data)
end

#pubsubObject



204
205
206
# File 'lib/htttee/server/api.rb', line 204

def pubsub
  EM::Protocols::PubSubRedis.connect(@host, @port)
end

#rack_input(rack_env) ⇒ Object



162
163
164
# File 'lib/htttee/server/api.rb', line 162

def rack_input(rack_env)
  rack_env['rack.input']
end

#respond_with(env, body, data) ⇒ Object



117
118
119
120
# File 'lib/htttee/server/api.rb', line 117

def respond_with(env, body, data)
  start_response(env, body, data)
  body.succeed
end

#set_input_callback(env, uuid, body) ⇒ Object



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/htttee/server/api.rb', line 45

def set_input_callback(env, uuid, body)
  rack_input(env).callback do
    async_callback(env).call [204, {}, body]

    redis.set(state_key(uuid), FIN) do
      finish(channel(uuid))
      body.succeed
    end
  end

  async_close(env).callback do
    redis.set(state_key(uuid), FIN) do
      finish(channel(uuid))
    end
  end
end

#set_input_each(env, uuid, body) ⇒ Object



71
72
73
74
75
76
77
78
# File 'lib/htttee/server/api.rb', line 71

def set_input_each(env, uuid, body)
  rack_input(env).each do |chunk|
    unless chunk.empty?
      redis.pipeline ['append', data_key(uuid), chunk],
        ['publish', channel(uuid), encode(STREAMING, chunk)]
    end
  end
end

#set_input_errback(env, uuid, body) ⇒ Object



62
63
64
65
66
67
68
69
# File 'lib/htttee/server/api.rb', line 62

def set_input_errback(env, uuid, body)
  rack_input(env).errback do |error|
    async_callback(env).call [500, {}, body]

    body.call [error.inspect]
    body.succeed
  end
end

#start_response(env, body, data = nil) ⇒ Object



122
123
124
125
126
# File 'lib/htttee/server/api.rb', line 122

def start_response(env, body, data = nil)
  env['async.callback'].call [200, {'Transfer-Encoding' => 'chunked', 'Content-Type' => 'text/plain'}, body]

  body.call [data] unless data.nil? || data.empty?
end

#state_key(uuid) ⇒ Object



154
155
156
# File 'lib/htttee/server/api.rb', line 154

def state_key(uuid)
  "#{uuid}:state"
end

#stream_data_to(body, key, offset = 0, chunk_size = 1024, &block) ⇒ Object



106
107
108
109
110
111
112
113
114
115
# File 'lib/htttee/server/api.rb', line 106

def stream_data_to(body, key, offset = 0, chunk_size = 1024, &block)
  redis.substr(key, offset, offset + chunk_size) do |chunk|
    if chunk.nil? || chunk.empty?
      yield
    else
      body.call [chunk]
      stream_data_to(body, key, offset + chunk.size, chunk_size, &block)
    end
  end
end

#subscribe(channel, &block) ⇒ Object



182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
# File 'lib/htttee/server/api.rb', line 182

def subscribe(channel, &block)
  conn = pubsub

  conn.subscribe channel do |type, chan, data|
    case type.upcase
    when SUBSCRIBE then block.call(SUBSCRIBE, chan)
    when MESSAGE
      state, data = Yajl::Parser.parse(data)
      case state
      when STREAMING  then block.call(STREAMING, data)
      when FIN
        conn.unsubscribe channel
        block.call(FIN, data)
      end
    else
      ''
    end
  end

  conn
end

#subscribe_and_stream(env, uuid, body) ⇒ Object



128
129
130
131
132
133
134
135
136
137
138
139
140
# File 'lib/htttee/server/api.rb', line 128

def subscribe_and_stream(env, uuid, body)
  conn = subscribe channel(uuid) do |type, message, *extra|
    case type
      #when SUBSCRIBE then start_response(env, body, data)
    when FIN       then body.succeed
    when STREAMING then body.call [message]
    end
  end

  async_close(env).callback do
    conn.close_connection
  end
end

#with_state_and_data_for(uuid, &block) ⇒ Object



146
147
148
# File 'lib/htttee/server/api.rb', line 146

def with_state_and_data_for(uuid, &block)
  redis.multi_get(state_key(uuid), data_key(uuid), &block)
end

#with_state_for(uuid, &block) ⇒ Object



142
143
144
# File 'lib/htttee/server/api.rb', line 142

def with_state_for(uuid, &block)
  redis.get(state_key(uuid), &block)
end