Class: Fiber
- Inherits:
-
Object
- Object
- Fiber
- Defined in:
- lib/rage/fiber.rb,
lib/rage/ext/setup.rb
Overview
Rage provides a simple and efficient API to wait on several instances of IO at the same time - Fiber.await.
Let's say we have the following controller:
class UsersController < RageController::API
def show
user = Net::HTTP.get(URI("http://users.service/users/#{params[:id]}"))
bookings = Net::HTTP.get(URI("http://bookings.service/bookings?user_id=#{params[:id]}"))
render json: { user: user, bookings: bookings }
end
end
This code will fire two consecutive HTTP requests. If each request takes 1 second to execute, the total execution time will be 2 seconds.
With Fiber.await, we can significantly decrease the overall execution time by changing the code to fire the requests concurrently.
To do this, we will need to:
- Wrap every request in a separate fiber using Fiber.schedule;
- Pass newly created fibers into Fiber.await;
class UsersController < RageController::API
def show
user, bookings = Fiber.await([
Fiber.schedule { Net::HTTP.get(URI("http://users.service/users/#{params[:id]}")) },
Fiber.schedule { Net::HTTP.get(URI("http://bookings.service/bookings?user_id=#{params[:id]}")) }
])
render json: { user: user, bookings: bookings }
end
end
With this change, if each request takes 1 second to execute, the total execution time will still be 1 second.
Creating fibers
Many developers see fibers as "lightweight threads" that should be used in conjunction with fiber pools, the same way we use thread pools for threads.
Instead, it makes sense to think of fibers as regular Ruby objects. We don't use a pool of arrays when we need to create an array - we create a new object and let Ruby and the GC do their job.
Same applies to fibers. Feel free to create as many fibers as you need on demand.
Active Record Connections
Let's consider the following controller, where we update a record in the database:
class UsersController < RageController::API
def update
User.update!(params[:id], email: params[:email])
render status: :ok
end
end
The User.update!
call here checks out an Active Record connection, and Rage will automatically check it back in once the action is completed. So far so good!
Let's consider another example:
require "net/http"
class UsersController < RageController::API
def update
User.update!(params[:id], email: params[:email]) # takes 5ms
Net::HTTP.post_form(URI("https://mailing.service/update"), { user_id: params[:id] }) # takes 50ms
render status: :ok
end
end
Here, we've added another step: once the record is updated, we will send a request to update the user's data in the mailing list service.
However, in this case, we want to release the Active Record connection before the action is completed. You can see that we need the connection only for the User.update!
call.
The next 50ms the code will spend waiting for the HTTP request to finish, and if we don't release the Active Record connection right away, other fibers won't be able to use it.
Active Record 7.2 handles this case by using #with_connection internally. With older Active Record versions, Rage handles this case on its own by keeping track of blocking calls and releasing Active Record connections between them.
Class Method Summary collapse
-
.await(fibers) ⇒ Object
Wait on several fibers at the same time.
-
.schedule(&block) ⇒ Object
Create a non-blocking fiber.
Class Method Details
.await(fibers) ⇒ Object
This method should only be used when multiple fibers have to be processed in parallel. There's no need to use Fiber.await
for single IO calls.
Wait on several fibers at the same time. Calling this method will automatically pause the current fiber, allowing the server to process other requests. Once all fibers have completed, the current fiber will be automatically resumed.
149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 |
# File 'lib/rage/fiber.rb', line 149 def self.await(fibers) f, fibers = Fiber.current, Array(fibers) # check which fibers are alive (i.e. have yielded) and which have errored out i, err, num_wait_for = 0, nil, 0 while i < fibers.length if fibers[i].alive? num_wait_for += 1 else err = fibers[i].__get_err break if err end i += 1 end # raise if one of the fibers has errored out or return the result if none have yielded if err raise err elsif num_wait_for == 0 return fibers.map!(&:__get_result) end # wait on async fibers; resume right away if one of the fibers errors out Iodine.subscribe("await:#{f.object_id}") do |_, err| if err == AWAIT_ERROR_MESSAGE f.resume else num_wait_for -= 1 f.resume if num_wait_for == 0 end end Fiber.defer(-1) Iodine.defer { Iodine.unsubscribe("await:#{f.object_id}") } # if num_wait_for is not 0 means we exited prematurely because of an error if num_wait_for > 0 raise fibers.find(&:__get_err).__get_err else fibers.map!(&:__get_result) end end |
.schedule(&block) ⇒ Object
Create a non-blocking fiber. Should mostly be used in conjunction with Fiber.await
.
|
# File 'lib/rage/fiber.rb', line 192
|