Class: Libuv::Reactor

Inherits:
Object show all
Extended by:
Accessors, ClassMethods
Includes:
Assertions, Resource
Defined in:
lib/libuv/reactor.rb

Defined Under Namespace

Modules: ClassMethods

Constant Summary collapse

CRITICAL =
::Mutex.new
LIBUV_MIN_POOL =
ENV['LIBUV_MIN_POOL'] || 8
LIBUV_MAX_POOL =
ENV['LIBUV_MAX_POOL'] || 40
LIBUV_MAX_QUEUE =
ENV['LIBUV_MAX_QUEUE'] || 50000
THREAD_POOL =
::Concurrent::ThreadPoolExecutor.new(
    min_threads: LIBUV_MIN_POOL,
    max_threads: LIBUV_MAX_POOL,
    max_queue: LIBUV_MAX_QUEUE
)

Constants included from Accessors

Accessors::Functions

Constants included from Assertions

Assertions::MSG_NO_PROC

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Accessors

reactor

Methods included from ClassMethods

create, current, default, new

Methods included from Resource

#check_result, #check_result!, #resolve, #to_ptr

Methods included from Assertions

#assert_block, #assert_boolean, #assert_type

Constructor Details

#initialize(pointer) ⇒ Reactor

Initialize a reactor using an FFI::Pointer to a libuv reactor



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/libuv/reactor.rb', line 73

def initialize(pointer) # :notnew:
    @pointer = pointer
    @reactor = self
    @run_count = 0
    @ref_count = 0
    @fiber_pool = FiberPool.new(self)

    # Create an async call for scheduling work from other threads
    @run_queue = Queue.new
    @process_queue = @reactor.async { process_queue_cb }
    @process_queue.unref

    # Create a next tick timer
    @next_tick = @reactor.timer { next_tick_cb }
    @next_tick.unref

    # Create an async call for ending the reactor
    @stop_reactor = @reactor.async { stop_cb }
    @stop_reactor.unref

    # Libuv can prevent the application shutting down once the main thread has ended
    # The addition of a prepare function prevents this from happening.
    @reactor_prep = prepare {}
    @reactor_prep.unref
    @reactor_prep.start

    # LibUV ingnores program interrupt by default.
    # We provide normal behaviour and allow this to be overriden
    @on_signal = []
    sig_callback = proc { signal_cb }
    self.signal(:INT, &sig_callback).unref
    self.signal(:HUP, &sig_callback).unref
    self.signal(:TERM, &sig_callback).unref

    # Notify of errors
    @throw_on_exit = nil
    @reactor_notify_default = @reactor_notify = proc { |error|
        @throw_on_exit = error
    }
    @fiber_pool.on_error &@reactor_notify
end

Instance Attribute Details

#fiber_poolObject (readonly)

Returns the value of attribute fiber_pool.



115
116
117
# File 'lib/libuv/reactor.rb', line 115

def fiber_pool
  @fiber_pool
end

#run_countObject (readonly)

Returns the value of attribute run_count.



115
116
117
# File 'lib/libuv/reactor.rb', line 115

def run_count
  @run_count
end

Instance Method Details

#active_handlesObject

Return the number of active handles in the event loop



235
236
237
238
# File 'lib/libuv/reactor.rb', line 235

def active_handles
    uvloop = Ext::UvLoop.new @pointer
    uvloop[:active_handles]
end

#all(*promises) ⇒ ::Libuv::Q::Promise

Combines multiple promises into a single promise that is resolved when all of the input promises are resolved. (thread safe)

Parameters:

  • *promises (::Libuv::Q::Promise)

    a number of promises that will be combined into a single promise

Returns:

  • (::Libuv::Q::Promise)

    Returns a single promise that will be resolved with an array of values, each value corresponding to the promise at the same index in the ‘promises` array. If any of the promises is resolved with a rejection, this resulting promise will be resolved with the same rejection.



269
270
271
# File 'lib/libuv/reactor.rb', line 269

def all(*promises)
    Q.all(@reactor, *promises)
end

#any(*promises) ⇒ ::Libuv::Q::Promise

Combines multiple promises into a single promise that is resolved when any of the input promises are resolved.

Parameters:

  • *promises (::Libuv::Q::Promise)

    a number of promises that will be combined into a single promise

Returns:



279
280
281
# File 'lib/libuv/reactor.rb', line 279

def any(*promises)
    Q.any(@reactor, *promises)
end

#async::Libuv::Async

Get a new Async handle

Returns:



418
419
420
421
422
# File 'lib/libuv/reactor.rb', line 418

def async
    handle = Async.new(@reactor)
    handle.progress &Proc.new if block_given?
    handle
end

#check::Libuv::Check

Get a new Check handle

Returns:



399
400
401
402
403
# File 'lib/libuv/reactor.rb', line 399

def check
    handle = Check.new(@reactor)
    handle.progress &Proc.new if block_given?
    handle
end

#defer::Libuv::Q::Deferred

Creates a deferred result object for where the result of an operation may only be returned at some point in the future or is being processed on a different thread (thread safe)



257
258
259
# File 'lib/libuv/reactor.rb', line 257

def defer
    Q.defer(@reactor)
end

#execObject

Execute the provided block of code in a fiber from the pool



214
215
216
# File 'lib/libuv/reactor.rb', line 214

def exec
    @fiber_pool.exec { yield }
end

#file(path, flags = 0, mode: 0, **opts, &blk) ⇒ ::Libuv::File

Opens a file and returns an object that can be used to manipulate it

Parameters:

  • path (String)

    the path to the file or folder for watching

  • flags (Integer) (defaults to: 0)

    see ruby File::Constants

  • mode (Integer) (defaults to: 0)

Returns:



492
493
494
495
496
497
# File 'lib/libuv/reactor.rb', line 492

def file(path, flags = 0, mode: 0, **opts, &blk)
    assert_type(String, path, "path must be a String")
    assert_type(Integer, flags, "flags must be an Integer")
    assert_type(Integer, mode, "mode must be an Integer")
    File.new(@reactor, path, flags, mode: mode, **opts, &blk)
end

#filesystem::Libuv::Filesystem

Returns an object for manipulating the filesystem

Returns:



502
503
504
# File 'lib/libuv/reactor.rb', line 502

def filesystem
    Filesystem.new(@reactor)
end

#finally(*promises) ⇒ ::Libuv::Q::Promise

Combines multiple promises into a single promise that is resolved when all of the input promises are resolved or rejected.

Parameters:

  • *promises (::Libuv::Q::Promise)

    a number of promises that will be combined into a single promise

Returns:

  • (::Libuv::Q::Promise)

    Returns a single promise that will be resolved with an array of values, each [result, wasResolved] value pair corresponding to a at the same index in the ‘promises` array.



290
291
292
# File 'lib/libuv/reactor.rb', line 290

def finally(*promises)
    Q.finally(@reactor, *promises)
end

#fs_event(path) ⇒ ::Libuv::FSEvent

Get a new FSEvent instance

Parameters:

  • path (String)

    the path to the file or folder for watching

Returns:

Raises:

  • (ArgumentError)

    if path is not a string



481
482
483
484
# File 'lib/libuv/reactor.rb', line 481

def fs_event(path)
    assert_type(String, path)
    FSEvent.new(@reactor, path)
end

#handleObject



172
# File 'lib/libuv/reactor.rb', line 172

def handle; @pointer; end

#idle::Libuv::Idle

Get a new Idle handle

Parameters:

  • callback (Proc)

    the callback to be called on idle trigger

Returns:



409
410
411
412
413
# File 'lib/libuv/reactor.rb', line 409

def idle
    handle = Idle.new(@reactor)
    handle.progress &Proc.new if block_given?
    handle
end

#inspectObject

Overwrite as errors in jRuby can literally hang VM when inspecting as many many classes will reference this class



167
168
169
# File 'lib/libuv/reactor.rb', line 167

def inspect
    "#<#{self.class}:0x#{self.__id__.to_s(16)} NT=#{@run_queue.length}>"
end

#log(error, msg = nil, trace = nil) ⇒ Object

Notifies the reactor there was an event that should be logged

Parameters:

  • error (Exception)

    the error

  • msg (String|nil) (defaults to: nil)

    optional context on the error

  • trace (Array<String>) (defaults to: nil)

    optional additional trace of caller if async



547
548
549
# File 'lib/libuv/reactor.rb', line 547

def log(error, msg = nil, trace = nil)
    @reactor_notify.call(error, msg, trace)
end

#lookup(hostname, hint = :IPv4, port = 9, wait: true) ⇒ ::Libuv::Dns

Lookup a hostname

Parameters:

  • hostname (String)

    the domain name to lookup

  • port (Integer, String) (defaults to: 9)

    the service being connected too

  • callback (Proc)

    the callback to be called on success

Returns:



466
467
468
469
470
471
472
473
474
# File 'lib/libuv/reactor.rb', line 466

def lookup(hostname, hint = :IPv4, port = 9, wait: true)
    dns = Dns.new(@reactor, hostname, port, hint, wait: wait)    # Work is a promise object
    if wait
        dns.results
    else
        dns.then &Proc.new if block_given?
        dns
    end
end

#lookup_error(err) ⇒ ::Libuv::Error

Lookup an error code and return is as an error object

Parameters:

  • err (Integer)

    The error code to look up.

Returns:



320
321
322
323
324
325
326
327
328
329
330
331
332
333
# File 'lib/libuv/reactor.rb', line 320

def lookup_error(err)
    name = ::Libuv::Ext.err_name(err)

    if name
        msg  = ::Libuv::Ext.strerror(err)
        ::Libuv::Error.const_get(name.to_sym).new("#{msg}, #{name}:#{err}")
    else
        # We want a back-trace in this case
        raise "error lookup failed for code #{err}"
    end
rescue Exception => e
    @reactor.log e, 'performing error lookup'
    e
end

#next_tick(&block) ⇒ Object

Queue some work to be processed in the next iteration of the event reactor (thread safe)

Parameters:

  • callback (Proc)

    the callback to be called on the reactor thread



526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
# File 'lib/libuv/reactor.rb', line 526

def next_tick(&block)
    @run_queue << block
    if reactor_thread?
        # Create a next tick timer
        if not @next_tick_scheduled
            @next_tick.start(0)
            @next_tick_scheduled = true
            @next_tick.ref
        end
    else
        @process_queue.call
    end

    self
end

#notifier::Libuv::Q::Promise

Provides a promise notifier for receiving un-handled exceptions

Returns:



244
245
246
247
248
249
250
251
# File 'lib/libuv/reactor.rb', line 244

def notifier
    @reactor_notify = if block_given?
        Proc.new
    else
        @reactor_notify_default
    end
    self
end

#nowInteger

Get current time in milliseconds

Returns:

  • (Integer)


312
313
314
# File 'lib/libuv/reactor.rb', line 312

def now
    ::Libuv::Ext.now(@pointer)
end

#on_program_interrupt(&callback) ⇒ Object

Allows user defined behaviour when sig int is received



435
436
437
438
# File 'lib/libuv/reactor.rb', line 435

def on_program_interrupt(&callback)
    @on_signal << callback
    self
end

#pipe(ipc = false) ⇒ ::Libuv::Pipe

Get a new Pipe instance

Parameters:

  • ipc (true, false) (defaults to: false)

    indicate if a handle will be used for ipc, useful for sharing tcp socket between processes

Returns:



373
374
375
# File 'lib/libuv/reactor.rb', line 373

def pipe(ipc = false)
    Pipe.new(@reactor, ipc)
end

#prepare::Libuv::Prepare

Get a new Prepare handle

Returns:



390
391
392
393
394
# File 'lib/libuv/reactor.rb', line 390

def prepare
    handle = Prepare.new(@reactor)
    handle.progress &Proc.new if block_given?
    handle
end

#reactor_running?Boolean Also known as: running?

Tells you whether the Libuv reactor reactor is currently running.

Returns:

  • (Boolean)


566
567
568
# File 'lib/libuv/reactor.rb', line 566

def reactor_running?
    @reactor_running
end

#reactor_thread?Boolean

True if the calling thread is the same thread as the reactor.

Returns:

  • (Boolean)


559
560
561
# File 'lib/libuv/reactor.rb', line 559

def reactor_thread?
    self == Thread.current.thread_variable_get(:reactor)
end

#refObject

Prevents the reactor loop from stopping



219
220
221
222
223
224
# File 'lib/libuv/reactor.rb', line 219

def ref
    if reactor_thread? && reactor_running?
        @process_queue.ref if @ref_count == 0
        @ref_count += 1
    end
end

#reject(reason) ⇒ Object

Creates a promise that is resolved as rejected with the specified reason. This api should be used to forward rejection in a chain of promises. If you are dealing with the last promise in a promise chain, you don’t need to worry about it.



297
298
299
# File 'lib/libuv/reactor.rb', line 297

def reject(reason)
    Q.reject(@reactor, reason)
end

#run(run_type = :UV_RUN_DEFAULT) {|promise| ... } ⇒ Object

Run the actual event reactor. This method will block until the reactor is stopped.

Parameters:

  • run_type (:UV_RUN_DEFAULT, :UV_RUN_ONCE, :UV_RUN_NOWAIT) (defaults to: :UV_RUN_DEFAULT)

Yield Parameters:

  • promise (::Libuv::Q::Promise)

    Yields a promise that can be used for logging unhandled exceptions on the reactor.



179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
# File 'lib/libuv/reactor.rb', line 179

def run(run_type = :UV_RUN_DEFAULT)
    if not @reactor_running
        begin
            @reactor_running = true
            raise 'only one reactor allowed per-thread' if Thread.current.thread_variable_get(:reactor)

            Thread.current.thread_variable_set(:reactor, @reactor)
            @throw_on_exit = nil
            update_time
            @fiber_pool.reset
            @fiber_pool.exec { yield @reactor } if block_given?
            @run_count += 1
            ::Libuv::Ext.run(@pointer, run_type)  # This is blocking
        ensure
            Thread.current.thread_variable_set(:reactor, nil)
            @reactor_running = false
            @run_queue.clear
        end

        # Raise the last unhandled error to occur on the reactor thread
        raise @throw_on_exit if @throw_on_exit

    elsif block_given?
        if reactor_thread?
            update_time
            yield @reactor
        else
            raise 'reactor already running on another thread'
        end
    end

    @reactor
end

#schedule { ... } ⇒ Object

Schedule some work to be processed on the event reactor as soon as possible (thread safe)

Yields:

  • the callback to be called on the reactor thread



513
514
515
516
517
518
519
520
521
# File 'lib/libuv/reactor.rb', line 513

def schedule
    if reactor_thread?
        yield
    else
        @run_queue << Proc.new
        @process_queue.call
    end
    self
end

#signal(signum = nil) ⇒ ::Libuv::Signal

Get a new signal handler

Returns:



427
428
429
430
431
432
# File 'lib/libuv/reactor.rb', line 427

def signal(signum = nil)
    handle = Signal.new(@reactor)
    handle.progress &Proc.new if block_given?
    handle.start(signum) if signum
    handle
end

#sleep(msecs) ⇒ Object



335
336
337
338
339
340
341
342
# File 'lib/libuv/reactor.rb', line 335

def sleep(msecs)
    fiber = Fiber.current
    time = timer {
        time.close
        fiber.resume
    }.start(msecs)
    Fiber.yield
end

#spawn(cmd, **args) ⇒ Object



506
507
508
# File 'lib/libuv/reactor.rb', line 506

def spawn(cmd, **args)
    Spawn.new(@reactor, cmd, **args)
end

#stopObject

Closes handles opened by the reactor class and completes the current reactor iteration (thread safe)



552
553
554
# File 'lib/libuv/reactor.rb', line 552

def stop
    @stop_reactor.call
end

#tcp(**opts, &callback) ⇒ ::Libuv::TCP

Get a new TCP instance

Returns:



347
348
349
# File 'lib/libuv/reactor.rb', line 347

def tcp(**opts, &callback)
    TCP.new(@reactor, progress: callback, **opts)
end

#timer::Libuv::Timer

Get a new timer instance

Parameters:

  • callback (Proc)

    the callback to be called on timer trigger

Returns:



381
382
383
384
385
# File 'lib/libuv/reactor.rb', line 381

def timer
    handle = Timer.new(@reactor)
    handle.progress &Proc.new if block_given?
    handle
end

#tty(fileno, readable = false) ⇒ ::Libuv::TTY

Get a new TTY instance

Parameters:

  • fileno (Integer)

    Integer file descriptor of a tty device

  • readable (true, false) (defaults to: false)

    Boolean indicating if TTY is readable

Returns:



363
364
365
366
367
# File 'lib/libuv/reactor.rb', line 363

def tty(fileno, readable = false)
    assert_type(Integer, fileno, "io#fileno must return an integer file descriptor, #{fileno.inspect} given")

    TTY.new(@reactor, fileno, readable)
end

#udp(**opts, &callback) ⇒ ::Libuv::UDP

Get a new UDP instance

Returns:



354
355
356
# File 'lib/libuv/reactor.rb', line 354

def udp(**opts, &callback)
    UDP.new(@reactor, progress: callback, **opts)
end

#unrefObject

Allows the reactor loop to stop



227
228
229
230
231
232
# File 'lib/libuv/reactor.rb', line 227

def unref
    if reactor_thread? && reactor_running? && @ref_count > 0
        @ref_count -= 1
        @process_queue.unref if @ref_count == 0
    end
end

#update_timeObject

forces reactor time update, useful for getting more granular times

Returns:

  • nil



304
305
306
307
# File 'lib/libuv/reactor.rb', line 304

def update_time
    ::Libuv::Ext.update_time(@pointer)
    self
end

#work::Libuv::Work

Queue some work for processing in the libuv thread pool

Parameters:

  • callback (Proc)

    the callback to be called in the thread pool

Returns:

Raises:

  • (ArgumentError)

    if block is not given



445
446
447
448
449
450
451
452
453
454
455
456
457
458
# File 'lib/libuv/reactor.rb', line 445

def work
    ref
    d = defer
    THREAD_POOL.post do
        begin
            d.resolve(yield)
        rescue Exception => e
            d.reject(e)
        end
    end
    promise = d.promise
    promise.finally { unref }
    promise
end