Module: Attr::Gather::Workflow::DSL

Included in:
Attr::Gather::Workflow
Defined in:
lib/attr/gather/workflow/dsl.rb

Overview

DSL for configuring a workflow

Constant Summary collapse

Undefined =

This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.

Object.new.freeze

Instance Method Summary collapse

Instance Method Details

#aggregator(agg = nil, opts = EMPTY_HASH) ⇒ Object

Configures the result aggregator

Aggregators make is possible to build custom logic about how results should be “merged” together. For example, yuo could build and aggregator that prioritizes the values of some tasks over others.

Examples:

class EnhanceUserProfile
  include Attr::Gather::Workflow

  aggregator :deep_merge
end
class EnhanceUserProfile
  include Attr::Gather::Workflow

  aggregator MyCustomAggregator
end

Parameters:

  • agg (#call) (defaults to: nil)

    the aggregator to use



147
148
149
150
151
152
153
154
155
156
157
# File 'lib/attr/gather/workflow/dsl.rb', line 147

def aggregator(agg = nil, opts = EMPTY_HASH)
  @aggregator = if agg.nil? && !defined?(@aggregator)
                  Aggregators.default
                elsif agg.respond_to?(:new)
                  agg.new(filter: filter, **opts)
                elsif agg
                  Aggregators.resolve(agg, filter: filter, **opts)
                else
                  @aggregator
                end
end

#container(cont = nil) ⇒ Object

Note:

For more information, check out https://dry-rb.org/gems/dry-container

Defines a container for task dependencies

Using a container makes it easy to re-use workflows with different data sources. Say one workflow was required to use a legacy DB, and one wanted to use a new DB. Using a container makes it easy to configure that dependency.

Examples:

LegacySystem = Dry::Container.new.tap do |c|
  c.register(:database) { Sequel.connect('sqlite://legacy.db')
end

class EnhanceUserProfile
  include Attr::Gather::Workflow

  container LegacySystem
end

Parameters:

  • cont (Dry::Container) (defaults to: nil)

    the Dry::Container to use



118
119
120
121
# File 'lib/attr/gather/workflow/dsl.rb', line 118

def container(cont = nil)
  @container = cont if cont
  @container
end

#fetch(task_name, opts = EMPTY_HASH) {|Attr::Gather::Workflow::Task| ... } ⇒ Object

Defines a task with name and options

Calling ‘fetch` will yield a task object which you can configure like a PORO. Tasks will be registered for execution in the workflow.

Examples:

class EnhanceUserProfile
  include Attr::Gather::Workflow

  # ...

  fetch :user_info do |t|
    t.depends_on = [:fetch_gravatar_info]
  end
end

Parameters:

  • task_name (Symbol)

    the name of the task

Yields:



66
67
68
# File 'lib/attr/gather/workflow/dsl.rb', line 66

def fetch(task_name, opts = EMPTY_HASH)
  task(task_name, opts)
end

#filter(filt = nil, *args, **opts) ⇒ Object

Defines a filter for filtering out invalid values

When aggregating data from many sources, it is hard to reason about all the ways invalid data will be returned. For example, if you are pulling data from a spreadsheet, there will often be typos, etc.

Defining a filter allows you to declaratively state what is valid. attr-gather will use this definition to automatically filter out invalid values, so they never make it into your system.

Filtering happens during each step of the workflow, which means that every Task will receive validated input that you can rely on.

Examples:

class UserContract < Dry::Validation::Contract do
  params do
    optional(:id).filled(:integer)
    optional(:email).filled(:str?, format?: /@/)
  end
end

class EnhanceUserProfile
  include Attr::Gather::Workflow

  # Any of the key/value pairs that had validation errors will be
  # filtered from the output.
  filter :contract, UserContract.new
end

Parameters:

  • filt (Symbol) (defaults to: nil)

    the name filter to use

  • args (Array<Object>)

    arguments for initializing the filter



192
193
194
195
196
197
198
199
200
201
202
203
204
# File 'lib/attr/gather/workflow/dsl.rb', line 192

def filter(filt = nil, *args, **opts)
  @filter = if filt.nil? && !defined?(@filter)
              Filters.default
            elsif filt
              Filters.resolve(filt, *args, **opts)
            else
              @filter
            end

  aggregator.filter = @filter

  @filter
end

#filter_with_contract(arg = nil, &blk) ⇒ Dry::Validation::Contract, NilClass

Defines a filter for filtering invalid values with an inline contract

This serves as a convenience method for defining a contract filter.

Examples:


class EnhanceUserProfile
  include Attr::Gather::Workflow

  # Any of the key/value pairs that had validation errors will be
  # filtered from the output.
  filter_with_contract do
     params do
       required(:name).filled(:string)
       required(:age).value(:integer)
     end

     rule(:age) do
       key.failure('must be greater than 18') if value < 18
     end
  end
end

Returns:

  • (Dry::Validation::Contract, NilClass)

See Also:



233
234
235
236
# File 'lib/attr/gather/workflow/dsl.rb', line 233

def filter_with_contract(arg = nil, &blk)
  contract = blk ? build_inline_contract_filter(&blk) : arg
  filter(:contract, contract)
end

#step(task_name, opts = EMPTY_HASH) {|Attr::Gather::Workflow::Task| ... } ⇒ Object

Defines a task with name and options

Calling ‘step` will yield a task object which you can configure like a PORO. Tasks will be registered for execution in the workflow.

Examples:

class EnhanceUserProfile
  include Attr::Gather::Workflow

  # ...

  step :fetch_user_info do |t|
    t.depends_on = [:email_info]
  end
end

Parameters:

  • task_name (Symbol)

    the name of the task

Yields:



91
92
93
# File 'lib/attr/gather/workflow/dsl.rb', line 91

def step(task_name, opts = EMPTY_HASH)
  task(task_name, opts)
end

#task(task_name, opts = EMPTY_HASH) {|Attr::Gather::Workflow::Task| ... } ⇒ Object

Defines a task with name and options

Calling ‘task` will yield a task object which you can configure like a PORO. Tasks will be registered for execution in the workflow.

Examples:

class EnhanceUserProfile
  include Attr::Gather::Workflow

  # ...

  task :fetch_database_info do |t|
    t.depends_on = []
  end

  task :fetch_avatar_info do |t|
    t.depends_on = [:fetch_gravatar_info]
  end
end

Parameters:

  • task_name (Symbol)

    the name of the task

Yields:



38
39
40
41
42
43
# File 'lib/attr/gather/workflow/dsl.rb', line 38

def task(task_name, opts = EMPTY_HASH)
  conf = OpenStruct.new
  yield conf
  tasks << ({ name: task_name, **opts, **conf.to_h })
  self
end