Class: HTTTee::Server::Api
- Inherits:
-
Object
- Object
- HTTTee::Server::Api
- Defined in:
- lib/htttee/server/api.rb
Constant Summary collapse
- AsyncResponse =
[-1, {}, []].freeze
Instance Attribute Summary collapse
-
#redis ⇒ Object
Returns the value of attribute redis.
Instance Method Summary collapse
- #async_callback(rack_env) ⇒ Object
- #async_close(rack_env) ⇒ Object
- #call(env) ⇒ Object
- #channel(uuid) ⇒ Object
- #closed_stream_response(env, uuid, body) ⇒ Object
- #data_key(uuid) ⇒ Object
- #encode(*parts) ⇒ Object
- #finish(channel) ⇒ Object
- #four_oh_four_response(env, body) ⇒ Object
- #get(env, uuid, body) ⇒ Object
-
#initialize(host, port) ⇒ Api
constructor
A new instance of Api.
- #open_stream_response(env, uuid, body) ⇒ Object
- #post(env, uuid, body) ⇒ Object
- #publish(channel, data) ⇒ Object
- #pubsub ⇒ Object
- #rack_input(rack_env) ⇒ Object
- #respond_with(env, body, data) ⇒ Object
- #set_input_callback(env, uuid, body) ⇒ Object
- #set_input_each(env, uuid, body) ⇒ Object
- #set_input_errback(env, uuid, body) ⇒ Object
- #start_response(env, body, data = nil) ⇒ Object
- #state_key(uuid) ⇒ Object
- #stream_data_to(body, key, offset = 0, chunk_size = 1024, &block) ⇒ Object
- #subscribe(channel, &block) ⇒ Object
- #subscribe_and_stream(env, uuid, body) ⇒ Object
- #with_state_and_data_for(uuid, &block) ⇒ Object
- #with_state_for(uuid, &block) ⇒ Object
Constructor Details
#initialize(host, port) ⇒ Api
Returns a new instance of Api.
11 12 13 |
# File 'lib/htttee/server/api.rb', line 11 def initialize(host, port) @host, @port = host, port end |
Instance Attribute Details
#redis ⇒ Object
Returns the value of attribute redis.
9 10 11 |
# File 'lib/htttee/server/api.rb', line 9 def redis @redis end |
Instance Method Details
#async_callback(rack_env) ⇒ Object
166 167 168 |
# File 'lib/htttee/server/api.rb', line 166 def async_callback(rack_env) rack_env['async.callback'] end |
#async_close(rack_env) ⇒ Object
170 171 172 |
# File 'lib/htttee/server/api.rb', line 170 def async_close(rack_env) rack_env['async.close'] end |
#call(env) ⇒ Object
15 16 17 18 19 20 21 22 23 24 25 |
# File 'lib/htttee/server/api.rb', line 15 def call(env) uuid = env['PATH_INFO'].sub(/^\//, '') body = env['rack.response_body'] case env['REQUEST_METHOD'] when 'POST' then post(env, uuid, body) when 'GET' then get(env, uuid, body) end AsyncResponse end |
#channel(uuid) ⇒ Object
158 159 160 |
# File 'lib/htttee/server/api.rb', line 158 def channel(uuid) uuid end |
#closed_stream_response(env, uuid, body) ⇒ Object
99 100 101 102 103 104 |
# File 'lib/htttee/server/api.rb', line 99 def closed_stream_response(env, uuid, body) start_response(env, body) stream_data_to(body, data_key(uuid)) do body.succeed end end |
#data_key(uuid) ⇒ Object
150 151 152 |
# File 'lib/htttee/server/api.rb', line 150 def data_key(uuid) "#{uuid}:data" end |
#encode(*parts) ⇒ Object
212 213 214 |
# File 'lib/htttee/server/api.rb', line 212 def encode(*parts) Yajl::Encoder.encode(parts) end |
#finish(channel) ⇒ Object
178 179 180 |
# File 'lib/htttee/server/api.rb', line 178 def finish(channel) redis.publish channel, encode(FIN) end |
#four_oh_four_response(env, body) ⇒ Object
80 81 82 83 84 |
# File 'lib/htttee/server/api.rb', line 80 def four_oh_four_response(env, body) env['async.callback'].call [404, {'Transfer-Encoding' => 'chunked'}, body] body.succeed end |
#get(env, uuid, body) ⇒ Object
35 36 37 38 39 40 41 42 43 |
# File 'lib/htttee/server/api.rb', line 35 def get(env, uuid, body) with_state_for(uuid) do |state| case state when NilClass then four_oh_four_response(env, body) when STREAMING then open_stream_response(env, uuid, body) when FIN then closed_stream_response(env, uuid, body) end end end |
#open_stream_response(env, uuid, body) ⇒ Object
86 87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/htttee/server/api.rb', line 86 def open_stream_response(env, uuid, body) start_response(env, body) stream_data_to(body, data_key(uuid)) do with_state_for(uuid) do |state| if state == FIN body.succeed else subscribe_and_stream(env, uuid, body) end end end end |
#post(env, uuid, body) ⇒ Object
27 28 29 30 31 32 33 |
# File 'lib/htttee/server/api.rb', line 27 def post(env, uuid, body) redis.set(state_key(uuid), STREAMING) set_input_callback(env, uuid, body) set_input_errback(env, uuid, body) set_input_each(env, uuid, body) end |
#publish(channel, data) ⇒ Object
174 175 176 |
# File 'lib/htttee/server/api.rb', line 174 def publish(channel, data) redis.publish channel, encode(STREAMING, data) end |
#pubsub ⇒ Object
204 205 206 |
# File 'lib/htttee/server/api.rb', line 204 def pubsub EM::Protocols::PubSubRedis.connect(@host, @port) end |
#rack_input(rack_env) ⇒ Object
162 163 164 |
# File 'lib/htttee/server/api.rb', line 162 def rack_input(rack_env) rack_env['rack.input'] end |
#respond_with(env, body, data) ⇒ Object
117 118 119 120 |
# File 'lib/htttee/server/api.rb', line 117 def respond_with(env, body, data) start_response(env, body, data) body.succeed end |
#set_input_callback(env, uuid, body) ⇒ Object
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/htttee/server/api.rb', line 45 def set_input_callback(env, uuid, body) rack_input(env).callback do async_callback(env).call [204, {}, body] redis.set(state_key(uuid), FIN) do finish(channel(uuid)) body.succeed end end async_close(env).callback do redis.set(state_key(uuid), FIN) do finish(channel(uuid)) end end end |
#set_input_each(env, uuid, body) ⇒ Object
71 72 73 74 75 76 77 78 |
# File 'lib/htttee/server/api.rb', line 71 def set_input_each(env, uuid, body) rack_input(env).each do |chunk| unless chunk.empty? redis.pipeline ['append', data_key(uuid), chunk], ['publish', channel(uuid), encode(STREAMING, chunk)] end end end |
#set_input_errback(env, uuid, body) ⇒ Object
62 63 64 65 66 67 68 69 |
# File 'lib/htttee/server/api.rb', line 62 def set_input_errback(env, uuid, body) rack_input(env).errback do |error| async_callback(env).call [500, {}, body] body.call [error.inspect] body.succeed end end |
#start_response(env, body, data = nil) ⇒ Object
122 123 124 125 126 |
# File 'lib/htttee/server/api.rb', line 122 def start_response(env, body, data = nil) env['async.callback'].call [200, {'Transfer-Encoding' => 'chunked', 'Content-Type' => 'text/plain'}, body] body.call [data] unless data.nil? || data.empty? end |
#state_key(uuid) ⇒ Object
154 155 156 |
# File 'lib/htttee/server/api.rb', line 154 def state_key(uuid) "#{uuid}:state" end |
#stream_data_to(body, key, offset = 0, chunk_size = 1024, &block) ⇒ Object
106 107 108 109 110 111 112 113 114 115 |
# File 'lib/htttee/server/api.rb', line 106 def stream_data_to(body, key, offset = 0, chunk_size = 1024, &block) redis.substr(key, offset, offset + chunk_size) do |chunk| if chunk.nil? || chunk.empty? yield else body.call [chunk] stream_data_to(body, key, offset + chunk.size, chunk_size, &block) end end end |
#subscribe(channel, &block) ⇒ Object
182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 |
# File 'lib/htttee/server/api.rb', line 182 def subscribe(channel, &block) conn = pubsub conn.subscribe channel do |type, chan, data| case type.upcase when SUBSCRIBE then block.call(SUBSCRIBE, chan) when MESSAGE state, data = Yajl::Parser.parse(data) case state when STREAMING then block.call(STREAMING, data) when FIN conn.unsubscribe channel block.call(FIN, data) end else '' end end conn end |
#subscribe_and_stream(env, uuid, body) ⇒ Object
128 129 130 131 132 133 134 135 136 137 138 139 140 |
# File 'lib/htttee/server/api.rb', line 128 def subscribe_and_stream(env, uuid, body) conn = subscribe channel(uuid) do |type, , *extra| case type #when SUBSCRIBE then start_response(env, body, data) when FIN then body.succeed when STREAMING then body.call [] end end async_close(env).callback do conn.close_connection end end |
#with_state_and_data_for(uuid, &block) ⇒ Object
146 147 148 |
# File 'lib/htttee/server/api.rb', line 146 def with_state_and_data_for(uuid, &block) redis.multi_get(state_key(uuid), data_key(uuid), &block) end |
#with_state_for(uuid, &block) ⇒ Object
142 143 144 |
# File 'lib/htttee/server/api.rb', line 142 def with_state_for(uuid, &block) redis.get(state_key(uuid), &block) end |