Module: Langis::Sinks

Defined in:
lib/langis/sinks.rb

Overview

Predefined sinks, destinations for a message pumped into the Langis Engines.

Constant Summary collapse

DELAYED_JOB_RESULT_KEY =

The header key whose value is the DelayedJob enqueue result.

'langis.sink.delayed_job.result'
REDIS_RESULT_KEY =

The header key whose value is the Redis rpush result.

'langis.sink.redis.result'
RESQUE_RESULT_KEY =

The header key whose value is the Resque enqueue result.

'langis.sink.resque.result'

Class Method Summary collapse

Class Method Details

.delayed_job(job_class, options = {}) ⇒ Array<Integer,Hash,#each>

Module function that creates the endpoint Rackish app that will enqueue a new instance of a DelayedJob job class with instantiation parameters extracted from the Rackish input environment.

Parameters:

  • job_class (Class)

    The DelayedJob job class to enqueue.

  • options (Hash) (defaults to: {})

    a customizable set of options

Options Hash (options):

  • :env_key (String) — default: Langis::MESSAGE_KEY

    The Rackish input environment key whose value is passed to the given job_class constructor. If the value of this key is an array, then the elements of that array are passed as though they were individually specified.

  • :priority (Integer) — default: 0

    DelayedJob priority to be used for all jobs enqueued with this sink.

  • :run_at (Time) — default: nil

    DelayedJob run_at to be used for all jobs enqueued with this sink.

  • :transform (Symbol) — default: nil

    method to call on the object passed in via :env_key if env_key responds to it. The returned Array’s elements becomes the initializer argument(s) of the delayed job. If the return value is anything but an array, then that value is passed on; even explicit nils are passed on to the job_class#new. Note that DelayedJob serializes these parameters in Yaml.

  • :transform_args (Array) — default: []

    arguments to pass to the transform call.

Returns:

  • (Array<Integer,Hash,#each>)

    A simple OK return with the header hash that contains the delayed job enqueue result.



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/langis/sinks.rb', line 35

def delayed_job(job_class, options={})
  priority = options[:priority] || 0
  run_at = options[:run_at]
  env_key = options[:env_key] || MESSAGE_KEY
  xform = options[:transform]
  xform_args = options[:transform_args] || []
  xform_args = [xform_args] unless xform_args.is_a? Array
  lambda { |env|
    message = env[env_key]
    if xform.is_a? Symbol and message.respond_to? xform
      args = message.send(xform, *xform_args)
    else
      args = message
    end
    args = [args] unless args.is_a? Array
    result = Delayed::Job.enqueue job_class.new(*args), priority, run_at
    return [OK, { DELAYED_JOB_RESULT_KEY => result }, ['']]
  }
end

.redis(connection, key, options = {}) ⇒ Array<Integer,Hash,#each>

Module function that creates the endpoint Rackish app that will rpush an input environment’s value into a list stored in a Redis database.

Parameters:

  • connection (Object)

    The redis database connection.

  • key (String)

    The index key of the list in the Redis database.

  • options (Hash) (defaults to: {})

    a customizable set of options

Options Hash (options):

  • :env_key (String) — default: Langis::MESSAGE_KEY

    The Rackish input environment key whose value is pushed onto the end of the Redis key’s list.

  • :transform (Symbol) — default: nil

    method to call on the object passed in via :env_key if env_key responds to it. The returned value is saved (after a #to_s by the Redis client) to the Redis database.

  • :transform_args (Array) — default: []

    arguments to pass to the transform call.

Returns:

  • (Array<Integer,Hash,#each>)

    A simple OK return with the header hash that contains the Redis#rpush result.



77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/langis/sinks.rb', line 77

def redis(connection, key, options={})
  env_key = options[:env_key] || MESSAGE_KEY
  xform = options[:transform]
  xform_args = options[:transform_args] || []
  xform_args = [xform_args] unless xform_args.is_a? Array
  lambda { |env|
    message = env[env_key]
    if xform.is_a? Symbol and message.respond_to? xform
      message = message.send(xform, *xform_args)
    end
    result = connection.rpush key, message
    return [OK, { REDIS_RESULT_KEY  => result }, ['']]
  }
end

.resque(job_class, options = {}) ⇒ Array<Integer,Hash,#each>

Module function that creates the endpoint Rackish app that will rpush an input environment’s value into a list stored in a Redis database.

Parameters:

  • job_class (Class)

    The Resque job class for which we want to enqueue the message.

  • options (Hash) (defaults to: {})

    a customizable set of options

Options Hash (options):

  • :env_key (String) — default: Langis::MESSAGE_KEY

    The Rackish input environment key whose value is passed as the input arguments to the actual execution of the Resque job. The found value can be an Array, in which case the elements will be used as the execution parameters of the given job.

  • :transform (Symbol) — default: nil

    method to call on the object passed in via :env_key if env_key responds to it. The returned Array’s elements becomes the perform argument(s) of the Resque job. If the return value is anything but an array, then that value is passed on; even explicit nils are passed as the perform argument. Note that these Resque arguments are serialized via #to_json

  • :transform_args (Array) — default: []

    arguments to pass to the transform call.

Returns:

  • (Array<Integer,Hash,#each>)

    A simple OK return with the header hash that contains the Resque enqueue result.



119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/langis/sinks.rb', line 119

def resque(job_class, options={})
  env_key = options[:env_key] || MESSAGE_KEY
  xform = options[:transform]
  xform_args = options[:transform_args] || []
  xform_args = [xform_args] unless xform_args.is_a? Array
  lambda { |env|
    message = env[env_key]
    if xform.is_a? Symbol and message.respond_to? xform
      args = message.send(xform, *xform_args)
    else
      args = message
    end
    args = [args] unless args.is_a? Array
    result = Resque.enqueue job_class, *args
    return [OK, { RESQUE_RESULT_KEY => result }, ['']]
  }
end