Class: GraphQL::Dataloader
- Inherits:
-
Object
- Object
- GraphQL::Dataloader
- Defined in:
- lib/graphql/dataloader.rb,
lib/graphql/dataloader/source.rb,
lib/graphql/dataloader/request.rb,
lib/graphql/dataloader/request_all.rb,
lib/graphql/dataloader/null_dataloader.rb
Overview
This plugin supports Fiber-based concurrency, along with Source.
Direct Known Subclasses
Defined Under Namespace
Classes: NullDataloader, Request, RequestAll, Source
Constant Summary collapse
- AsyncDataloader =
Class.new(self) { self.default_nonblocking = true }
Class Attribute Summary collapse
-
.default_nonblocking ⇒ Object
Returns the value of attribute default_nonblocking.
Class Method Summary collapse
- .use(schema, nonblocking: nil) ⇒ Object
-
.with_dataloading(&block) ⇒ Object
Call the block with a Dataloader instance, then run all enqueued jobs and return the result of the block.
Instance Method Summary collapse
- #append_job(&job) ⇒ Object
-
#clear_cache ⇒ void
Clear any already-loaded objects from Source caches.
-
#get_fiber_variables ⇒ Hash<Symbol, Object>
This is called before the fiber is spawned, from the parent context (i.e. from the thread or fiber that it is scheduled from).
-
#initialize(nonblocking: self.class.default_nonblocking) ⇒ Dataloader
constructor
A new instance of Dataloader.
- #join_queues(previous_queue, next_queue) ⇒ Object
- #nonblocking? ⇒ Boolean
- #run ⇒ Object
-
#run_isolated ⇒ Object
Use a self-contained queue for the work in the block.
-
#set_fiber_variables(vars) ⇒ void
Set up the fiber variables in a new fiber.
-
#with(source_class, *batch_args, **batch_kwargs) ⇒ Object
truffle-ruby wasn't doing well with the implementation below.
-
#yield ⇒ void
Tell the dataloader that this fiber is waiting for data.
Constructor Details
#initialize(nonblocking: self.class.default_nonblocking) ⇒ Dataloader
Returns a new instance of Dataloader.
52 53 54 55 56 57 58 |
# File 'lib/graphql/dataloader.rb', line 52 def initialize(nonblocking: self.class.default_nonblocking) @source_cache = Hash.new { |h, k| h[k] = {} } @pending_jobs = [] if !nonblocking.nil? @nonblocking = nonblocking end end |
Class Attribute Details
.default_nonblocking ⇒ Object
Returns the value of attribute default_nonblocking.
27 28 29 |
# File 'lib/graphql/dataloader.rb', line 27 def default_nonblocking @default_nonblocking end |
Class Method Details
.use(schema, nonblocking: nil) ⇒ Object
32 33 34 35 36 37 38 |
# File 'lib/graphql/dataloader.rb', line 32 def self.use(schema, nonblocking: nil) schema.dataloader_class = if nonblocking AsyncDataloader else self end end |
.with_dataloading(&block) ⇒ Object
Call the block with a Dataloader instance, then run all enqueued jobs and return the result of the block.
42 43 44 45 46 47 48 49 50 |
# File 'lib/graphql/dataloader.rb', line 42 def self.with_dataloading(&block) dataloader = self.new result = nil dataloader.append_job { result = block.call(dataloader) } dataloader.run result end |
Instance Method Details
#append_job(&job) ⇒ Object
126 127 128 129 130 131 |
# File 'lib/graphql/dataloader.rb', line 126 def append_job(&job) # Given a block, queue it up to be worked through when `#run` is called. # (If the dataloader is already running, than a Fiber will pick this up later.) @pending_jobs.push(job) nil end |
#clear_cache ⇒ void
This method returns an undefined value.
Clear any already-loaded objects from Source caches
135 136 137 138 139 140 |
# File 'lib/graphql/dataloader.rb', line 135 def clear_cache @source_cache.each do |_source_class, batched_sources| batched_sources.each_value(&:clear_cache) end nil end |
#get_fiber_variables ⇒ Hash<Symbol, Object>
This is called before the fiber is spawned, from the parent context (i.e. from the thread or fiber that it is scheduled from).
68 69 70 71 72 73 74 75 76 77 |
# File 'lib/graphql/dataloader.rb', line 68 def get_fiber_variables fiber_vars = {} Thread.current.keys.each do |fiber_var_key| # This variable should be fresh in each new fiber if fiber_var_key != :__graphql_runtime_info fiber_vars[fiber_var_key] = Thread.current[fiber_var_key] end end fiber_vars end |
#join_queues(previous_queue, next_queue) ⇒ Object
269 270 271 272 273 274 275 |
# File 'lib/graphql/dataloader.rb', line 269 def join_queues(previous_queue, next_queue) if @nonblocking Fiber.scheduler.run next_queue.select!(&:alive?) end previous_queue.concat(next_queue) end |
#nonblocking? ⇒ Boolean
60 61 62 |
# File 'lib/graphql/dataloader.rb', line 60 def nonblocking? @nonblocking end |
#run ⇒ Object
171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 |
# File 'lib/graphql/dataloader.rb', line 171 def run if @nonblocking && !Fiber.scheduler raise "`nonblocking: true` requires `Fiber.scheduler`, assign one with `Fiber.set_scheduler(...)` before executing GraphQL." end # At a high level, the algorithm is: # # A) Inside Fibers, run jobs from the queue one-by-one # - When one of the jobs yields to the dataloader (`Fiber.yield`), then that fiber will pause # - In that case, if there are still pending jobs, a new Fiber will be created to run jobs # - Continue until all jobs have been _started_ by a Fiber. (Any number of those Fibers may be waiting to be resumed, after their data is loaded) # B) Once all known jobs have been run until they are complete or paused for data, run all pending data sources. # - Similarly, create a Fiber to consume pending sources and tell them to load their data. # - If one of those Fibers pauses, then create a new Fiber to continue working through remaining pending sources. # - When a source causes another source to become pending, run the newly-pending source _first_, since it's a dependency of the previous one. # C) After all pending sources have been completely loaded (there are no more pending sources), resume any Fibers that were waiting for data. # - Those Fibers assume that source caches will have been populated with the data they were waiting for. # - Those Fibers may request data from a source again, in which case they will yeilded and be added to a new pending fiber list. # D) Once all pending fibers have been resumed once, return to `A` above. # # For whatever reason, the best implementation I could find was to order the steps `[D, A, B, C]`, with a special case for skipping `D` # on the first pass. I just couldn't find a better way to write the loops in a way that was DRY and easy to read. # pending_fibers = [] next_fibers = [] pending_source_fibers = [] next_source_fibers = [] first_pass = true while first_pass || (f = pending_fibers.shift) if first_pass first_pass = false else # These fibers were previously waiting for sources to load data, # resume them. (They might wait again, in which case, re-enqueue them.) resume(f) if f.alive? next_fibers << f end end while @pending_jobs.any? # Create a Fiber to consume jobs until one of the jobs yields # or jobs run out f = spawn_fiber { while (job = @pending_jobs.shift) job.call end } resume(f) # In this case, the job yielded. Queue it up to run again after # we load whatever it's waiting for. if f.alive? next_fibers << f end end if pending_fibers.empty? # Now, run all Sources which have become pending _before_ resuming GraphQL execution. # Sources might queue up other Sources, which is fine -- those will also run before resuming execution. # # This is where an evented approach would be even better -- can we tell which # fibers are ready to continue, and continue execution there? # if (first_source_fiber = create_source_fiber) pending_source_fibers << first_source_fiber end while pending_source_fibers.any? while (outer_source_fiber = pending_source_fibers.pop) resume(outer_source_fiber) if outer_source_fiber.alive? next_source_fibers << outer_source_fiber end if (next_source_fiber = create_source_fiber) pending_source_fibers << next_source_fiber end end join_queues(pending_source_fibers, next_source_fibers) next_source_fibers.clear end # Move newly-enqueued Fibers on to the list to be resumed. # Clear out the list of next-round Fibers, so that # any Fibers that pause can be put on it. join_queues(pending_fibers, next_fibers) next_fibers.clear end end if @pending_jobs.any? raise "Invariant: #{@pending_jobs.size} pending jobs" elsif pending_fibers.any? raise "Invariant: #{pending_fibers.size} pending fibers" elsif next_fibers.any? raise "Invariant: #{next_fibers.size} next fibers" end nil end |
#run_isolated ⇒ Object
Use a self-contained queue for the work in the block.
143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 |
# File 'lib/graphql/dataloader.rb', line 143 def run_isolated prev_queue = @pending_jobs prev_pending_keys = {} @source_cache.each do |source_class, batched_sources| batched_sources.each do |batch_args, batched_source_instance| if batched_source_instance.pending? prev_pending_keys[batched_source_instance] = batched_source_instance.pending.dup batched_source_instance.pending.clear end end end @pending_jobs = [] res = nil # Make sure the block is inside a Fiber, so it can `Fiber.yield` append_job { res = yield } run res ensure @pending_jobs = prev_queue prev_pending_keys.each do |source_instance, pending| source_instance.pending.merge!(pending) end end |
#set_fiber_variables(vars) ⇒ void
This method returns an undefined value.
Set up the fiber variables in a new fiber.
This is called within the fiber, right after it is spawned.
85 86 87 88 |
# File 'lib/graphql/dataloader.rb', line 85 def set_fiber_variables(vars) vars.each { |k, v| Thread.current[k] = v } nil end |
#with(source_class, *batch_args, **batch_kwargs) ⇒ Object
truffle-ruby wasn't doing well with the implementation below
97 98 99 100 101 102 103 104 |
# File 'lib/graphql/dataloader.rb', line 97 def with(source_class, *batch_args) batch_key = source_class.batch_key_for(*batch_args) @source_cache[source_class][batch_key] ||= begin source = source_class.new(*batch_args) source.setup(self) source end end |
#yield ⇒ void
This method returns an undefined value.
Tell the dataloader that this fiber is waiting for data.
Dataloader will resume the fiber after the requested data has been loaded (by another Fiber).
120 121 122 123 |
# File 'lib/graphql/dataloader.rb', line 120 def yield Fiber.yield nil end |