Class: Langis::Engine::EventMachineRunner

Inherits:
Object
  • Object
show all
Defined in:
lib/langis/engine.rb

Overview

Instances of this class are executed when a message is pushed to the intake’s EventMachine::Channel that it subscribed to. Its primary function is to safely execute the Langis sink (Rackish application) that it has been tasked to manage. To do this safely with performance considerations, it enqueues a Proc to be handled by the EvenMachine’s deferred thread pool and protects the thread pool by wrapping the sink call with a rescue block. Any caught errors will result in an error message with the caught exception pushed into the given EventMachine error channel. All successful completions will push the returned Rackish result to the EventMachine success channel.

Instance Method Summary collapse

Constructor Details

#initialize(app, options = {}) ⇒ EventMachineRunner

Returns a new instance of EventMachineRunner.

Parameters:

  • app (#call)

    The Rackish app to execute.

  • options (Hash) (defaults to: {})

    ({})

Options Hash (options):

  • :success_channel (EventMachine::Channel) — default: nil

    The EventMachine::Channel instance to push the Rackish app’s return results to. This happens when there are no errors raised.

  • :error_channel (EventMachine::Channel) — default: nil

    The EventMachine::Channel instance to push error messages to when the runner catches an exception during the execution of the Rackish app.

  • :evm (Object) — default: EventMachine

    Specify a different class/module to use when executing deferred calls. Mainly useful for unit testing, or if you want to run the app directly in the main thread instead of the deferred thread pool.



32
33
34
35
36
37
38
# File 'lib/langis/engine.rb', line 32

def initialize(app, options={})
  @app = app
  @success_channel = options[:success_channel]
  @error_channel = options[:error_channel]
  @evm = options[:evm] || EventMachine
  @intake_name = options[:intake_name]
end

Instance Method Details

#call(message) ⇒ Object

The method that is called in the EventMachine’s main reactor thread whose job is to enqueue the main app code to be run by EventMachine’s deferred thread pool.

This method sets up the Rackish environment hash that is passed as the main input to the Rackish app; it sets up the Proc that will be actually executed in by the deferred thread pool. The proc protects the thread pool from exceptions, and pushes respective error and success results to given EventMachine channels.

Parameters:

  • message (Object)

    The message that is getting pushed through the Langis pipes to their eventual handlers.



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/langis/engine.rb', line 53

def call(message)
  # Assign local variables to the proper apps, etc for readability.
  app = @app
  success_channel = @success_channel
  error_channel = @error_channel
  intake_name = @intake_name
  # Enqueue the proc to be run in by the deferred thread pool.
  @evm.defer(proc do
    # Create the base environment that is understood by the Rackish apps.
    env = {}
    env[MESSAGE_TYPE_KEY] = message.message_type.to_s if(
      message.respond_to? :message_type)
    env[MESSAGE_KEY] = message
    env[INTAKE_KEY] = intake_name
    # Actually run the Rackish app, protected by a rescue block.
    # Push the results to their respective channels when finished.
    begin
      results = app.call env
      success_channel.push(results) if success_channel
    rescue => e
      # It was an error, so we have to create a Rackish response array.
      # We push a SERVER_ERROR status along with an enhanced
      # headers section: the exception and original message.
      error_channel.push([
        SERVER_ERROR,
        env.merge({ X_EXCEPTION => e}),
        ['']]) if error_channel
    end
  end)
end