Class: ActionCable::Server::Worker
- Defined in:
- actioncable/lib/action_cable/server/worker.rb,
actioncable/lib/action_cable/server/worker/active_record_connection_management.rb
Overview
Worker used by Server.send_async to do connection work in threads.
Defined Under Namespace
Modules: ActiveRecordConnectionManagement
Constant Summary
Constants included from ActiveSupport::Callbacks
ActiveSupport::Callbacks::CALLBACK_FILTER_TYPES
Instance Attribute Summary collapse
-
#executor ⇒ Object
readonly
Returns the value of attribute executor.
Instance Method Summary collapse
- #async_exec(receiver, *args, connection:, &block) ⇒ Object
- #async_invoke(receiver, method, *args, connection: receiver, &block) ⇒ Object
-
#halt ⇒ Object
Stop processing work: any work that has not already started running will be discarded from the queue.
-
#initialize(max_size: 5) ⇒ Worker
constructor
A new instance of Worker.
- #invoke(receiver, method, *args, connection:, &block) ⇒ Object
- #stopping? ⇒ Boolean
- #work(connection, &block) ⇒ Object
Methods included from ActiveRecordConnectionManagement
Methods included from ActiveSupport::Concern
#append_features, #class_methods, extended, #included, #prepend_features, #prepended
Methods included from ActiveSupport::Callbacks
Constructor Details
#initialize(max_size: 5) ⇒ Worker
Returns a new instance of Worker.
19 20 21 22 23 24 25 26 |
# File 'actioncable/lib/action_cable/server/worker.rb', line 19 def initialize(max_size: 5) @executor = Concurrent::ThreadPoolExecutor.new( name: "ActionCable", min_threads: 1, max_threads: max_size, max_queue: 0, ) end |
Instance Attribute Details
#executor ⇒ Object (readonly)
Returns the value of attribute executor
17 18 19 |
# File 'actioncable/lib/action_cable/server/worker.rb', line 17 def executor @executor end |
Instance Method Details
#async_exec(receiver, *args, connection:, &block) ⇒ Object
46 47 48 |
# File 'actioncable/lib/action_cable/server/worker.rb', line 46 def async_exec(receiver, *args, connection:, &block) async_invoke receiver, :instance_exec, *args, connection: connection, &block end |
#async_invoke(receiver, method, *args, connection: receiver, &block) ⇒ Object
50 51 52 53 54 |
# File 'actioncable/lib/action_cable/server/worker.rb', line 50 def async_invoke(receiver, method, *args, connection: receiver, &block) @executor.post do invoke(receiver, method, *args, connection: connection, &block) end end |
#halt ⇒ Object
Stop processing work: any work that has not already started running will be discarded from the queue
30 31 32 |
# File 'actioncable/lib/action_cable/server/worker.rb', line 30 def halt @executor.shutdown end |
#invoke(receiver, method, *args, connection:, &block) ⇒ Object
56 57 58 59 60 61 62 63 64 65 |
# File 'actioncable/lib/action_cable/server/worker.rb', line 56 def invoke(receiver, method, *args, connection:, &block) work(connection) do receiver.send method, *args, &block rescue Exception => e logger.error "There was an exception - #{e.class}(#{e.})" logger.error e.backtrace.join("\n") receiver.handle_exception if receiver.respond_to?(:handle_exception) end end |
#stopping? ⇒ Boolean
34 35 36 |
# File 'actioncable/lib/action_cable/server/worker.rb', line 34 def stopping? @executor.shuttingdown? end |
#work(connection, &block) ⇒ Object
38 39 40 41 42 43 44 |
# File 'actioncable/lib/action_cable/server/worker.rb', line 38 def work(connection, &block) self.connection = connection run_callbacks :work, &block ensure self.connection = nil end |