Burner
This library serves as the skeleton for a processing engine. It allows you to organize your code into Jobs, then stitch those jobs together as steps.
Installation
To install through Rubygems:
gem install burner
You can also add this to your Gemfile:
bundle add burner
Examples
The purpose of this library is to provide a framework for creating highly de-coupled functions (known as jobs), and then allow for the stitching of them back together in any arbitrary order (know as steps.) Although our example will be somewhat specific and contrived, the only limit to what the jobs and order of jobs are is up to your imagination.
JSON-to-YAML File Converter
All the jobs for this example are shipped with this library. In this example, we will write a pipeline that can read a JSON file and convert it to YAML. Pipelines are data-first so we can represent a pipeline using a hash:
pipeline = {
jobs: [
{
name: :read,
type: 'b/io/read',
path: '{input_file}'
},
{
name: :output_id,
type: 'b/echo',
message: 'The job id is: {__id}'
},
{
name: :output_value,
type: 'b/echo',
message: 'The current value is: {__default_register}'
},
{
name: :parse,
type: 'b/deserialize/json'
},
{
name: :convert,
type: 'b/serialize/yaml'
},
{
name: :write,
type: 'b/io/write',
path: '{output_file}'
}
],
steps: %i[
read
output_id
output_value
parse
convert
output_value
write
]
}
params = {
input_file: 'input.json',
output_file: 'output.yaml'
}
payload = Burner::Payload.new(params: params)
Assuming we are running this script from a directory where an input.json
file exists, we can then programatically process the pipeline:
Burner::Pipeline.make(pipeline).execute(payload: payload)
We should now see a output.yaml file created.
Some notes:
- Some values are able to be string-interpolated using the provided Payload#params. This allows for the passing runtime configuration/data into pipelines/jobs.
- The job's ID can be accessed using the
__id
key. - The current payload registers' values can be accessed using the
__<register_name>_register
key. - Jobs can be re-used (just like the output_id and output_value jobs).
- If steps is nil then all jobs will execute in their declared order.
Omitting Job Names and Steps
Job names are optional, but steps can only correspond to named jobs. This means if steps is declared then anonymous jobs will have no way to be executed. Here is the same pipeline as above, but without job names and steps:
pipeline = {
jobs: [
{
type: 'b/io/read',
path: '{input_file}'
},
{
type: 'b/echo',
message: 'The job id is: {__id}'
},
{
type: 'b/echo',
message: 'The current value is: {__default_register}'
},
{
type: 'b/deserialize/json'
},
{
type: 'b/serialize/yaml'
},
{
type: 'b/echo',
message: 'The current value is: {__default_register}'
},
{
type: 'b/io/write',
path: '{output_file}'
}
]
}
params = {
input_file: 'input.json',
output_file: 'output.yaml'
}
payload = Burner::Payload.new(params: params)
Burner::Pipeline.make(pipeline).execute(payload: payload)
Like everything in software, there are trade-offs to the above two equivalent pipelines. The former (one with steps and job names) has less jobs but is more verbose. The latter (without steps and job names) has more jobs but reads terser. Names also can aid in self-documenting your code/configuration so it may be a good idea to enforce at least names are used.
Capturing Feedback / Output
By default, output will be emitted to $stdout
. You can add or change listeners by passing in optional values into Pipeline#execute. For example, say we wanted to capture the output from our json-to-yaml example:
io = StringIO.new
output = Burner::Output.new(outs: io)
payload = Burner::Payload.new(params: params)
Burner::Pipeline.make(pipeline).execute(output: output, payload: payload)
log = io.string
The value of log
should now look similar to:
[8bdc394e-7047-4a1a-87ed-6c54ed690ed5 | 2020-10-14 13:49:59 UTC] Pipeline started with 7 step(s)
[8bdc394e-7047-4a1a-87ed-6c54ed690ed5 | 2020-10-14 13:49:59 UTC] Parameters:
[8bdc394e-7047-4a1a-87ed-6c54ed690ed5 | 2020-10-14 13:49:59 UTC] - input_file: input.json
[8bdc394e-7047-4a1a-87ed-6c54ed690ed5 | 2020-10-14 13:49:59 UTC] - output_file: output.yaml
[8bdc394e-7047-4a1a-87ed-6c54ed690ed5 | 2020-10-14 13:49:59 UTC] --------------------------------------------------------------------------------
[8bdc394e-7047-4a1a-87ed-6c54ed690ed5 | 2020-10-14 13:49:59 UTC] [1] Burner::Library::IO::Read::read
[8bdc394e-7047-4a1a-87ed-6c54ed690ed5 | 2020-10-14 13:49:59 UTC] - Reading: spec/fixtures/input.json
[8bdc394e-7047-4a1a-87ed-6c54ed690ed5 | 2020-10-14 13:49:59 UTC] - Completed in: 0.0 second(s)
[8bdc394e-7047-4a1a-87ed-6c54ed690ed5 | 2020-10-14 13:49:59 UTC] [2] Burner::Library::Echo::output_id
[8bdc394e-7047-4a1a-87ed-6c54ed690ed5 | 2020-10-14 13:49:59 UTC] - The job id is:
[8bdc394e-7047-4a1a-87ed-6c54ed690ed5 | 2020-10-14 13:49:59 UTC] - Completed in: 0.0 second(s)
[8bdc394e-7047-4a1a-87ed-6c54ed690ed5 | 2020-10-14 13:49:59 UTC] [3] Burner::Library::Echo::output_value
[8bdc394e-7047-4a1a-87ed-6c54ed690ed5 | 2020-10-14 13:49:59 UTC] - The current value is:
[8bdc394e-7047-4a1a-87ed-6c54ed690ed5 | 2020-10-14 13:49:59 UTC] - Completed in: 0.0 second(s)
[8bdc394e-7047-4a1a-87ed-6c54ed690ed5 | 2020-10-14 13:49:59 UTC] [4] Burner::Library::Deserialize::Json::parse
[8bdc394e-7047-4a1a-87ed-6c54ed690ed5 | 2020-10-14 13:49:59 UTC] - Completed in: 0.0 second(s)
[8bdc394e-7047-4a1a-87ed-6c54ed690ed5 | 2020-10-14 13:49:59 UTC] [5] Burner::Library::Serialize::Yaml::convert
[8bdc394e-7047-4a1a-87ed-6c54ed690ed5 | 2020-10-14 13:49:59 UTC] - Completed in: 0.0 second(s)
[8bdc394e-7047-4a1a-87ed-6c54ed690ed5 | 2020-10-14 13:49:59 UTC] [6] Burner::Library::Echo::output_value
[8bdc394e-7047-4a1a-87ed-6c54ed690ed5 | 2020-10-14 13:49:59 UTC] - The current value is:
[8bdc394e-7047-4a1a-87ed-6c54ed690ed5 | 2020-10-14 13:49:59 UTC] - Completed in: 0.0 second(s)
[8bdc394e-7047-4a1a-87ed-6c54ed690ed5 | 2020-10-14 13:49:59 UTC] [7] Burner::Library::IO::Write::write
[8bdc394e-7047-4a1a-87ed-6c54ed690ed5 | 2020-10-14 13:49:59 UTC] - Writing: output.yaml
[8bdc394e-7047-4a1a-87ed-6c54ed690ed5 | 2020-10-14 13:49:59 UTC] - Completed in: 0.0 second(s)
[8bdc394e-7047-4a1a-87ed-6c54ed690ed5 | 2020-10-14 13:49:59 UTC] --------------------------------------------------------------------------------
[8bdc394e-7047-4a1a-87ed-6c54ed690ed5 | 2020-10-14 13:49:59 UTC] Pipeline ended, took 0.001 second(s) to complete
Notes:
- The Job ID is specified as the leading UUID in each line.
outs
can be provided an array of listeners, as long as each listener responds toputs(msg)
.
Command Line Pipeline Processing
This library also ships with a built-in script burner
that illustrates using the Burner::Cli
API. This class can take in an array of arguments (similar to a command-line) and execute a pipeline. The first argument is the path to a YAML file with the pipeline's configuration and each subsequent argument is a param in key=value
form. Here is how the json-to-yaml example can utilize this interface:
Create YAML Pipeline Configuration File
Write the following json_to_yaml_pipeline.yaml file to disk:
jobs:
- name: read
type: b/io/read
path: '{input_file}'
- name: output_id
type: b/echo
message: 'The job id is: {__id}'
- name: output_value
type: b/echo
message: 'The current value is: {__default_register}'
- name: parse
type: b/deserialize/json
- name: convert
type: b/serialize/yaml
- name: write
type: b/io/write
path: '{output_file}'
steps:
- read
- output_id
- output_value
- parse
- convert
- output_value
- write
Run Using Script
From the command-line, run:
bundle exec burner json_to_yaml_pipeline.yaml input_file=input.json output_file=output.yaml
The pipeline should be processed and output.yaml should be created.
Run Using Programmatic API
Instead of the script, you can invoke it using code:
args = %w[
json_to_yaml_pipeline.yaml
input_file=input.json
output_file=output.yaml
]
Burner::Cli.new(args).invoke
Core Job Library
This library only ships with very basic, rudimentary jobs that are meant to just serve as a baseline:
Collection
- b/collection/arrays_to_objects [mappings, register]: Convert an array of arrays to an array of objects.
- b/collection/coalesce [grouped_register, insensitive, key_mappings, keys, register, separator]: Merge two datasets together based on the key values of one dataset (array) with a grouped dataset (hash). If insensitive (false by default) is true then each key's value will be converted/coerced to a lowercase string.
- b/collection/concatenate [from_registers, to_register]: Concatenate each from_register's value and place the newly concatenated array into the to_register. Note: this does not do any deep copying and should be assumed it is shallow copying all objects.
- b/collection/flat_file_parse [keys_register, register, separator, key_mappings]: Map an array of arrays to an array of hashes. These keys can be realized at run-time as they are pulled from the first entry in the array. The
keys_register
will also be set to the keys used for mapping. Only keys that are mapped will be included in thekeys_register
array ifkey_mappings
are defined. Otherwise all keys that are pulled from the first entry in theregister
will be included in thekeys_register
. - b/collection/graph [config, key, register]: Use Hashematics to turn a flat array of objects into a deeply nested object tree.
- b/collection/group [insensitive, keys, register, separator]: Take a register's value (an array of objects) and group the objects by the specified keys. If insensitive (false by default) is true then each key's value will be converted/coerced to a lowercase string.
- b/collection/nested_aggregate [register, key_mappings, key, separator]: Traverse a set of objects, resolving key's value for each object, optionally copying down key_mappings to the child records, then merging all the inner records together.
- b/collection/number [key, register, separator, start_at]: This job can iterate over a set of records and sequence them (set the specified key to a sequential index value.)
- b/collection/objects_to_arrays [mappings, register]: Convert an array of objects to an array of arrays.
- b/collection/only_keys [keys_register, register, separator]: Limit an array of objects' keys to a specified set of keys. These keys can be realized at run-time as they are pulled from another register (
keys_register
) thus making it dynamic. - b/collection/pivot [unique_keys, insensitive, other_keys, pivot_key, pivot_value_key, register, separator]: Take an array of objects and pivot a key into multiple keys. It essentially takes all the values for a key and creates N number of keys (one per value.) Under the hood it uses HashMath's Record and Table classes.
- b/collection/prepend [from_registers, to_register]: Alias for b\collection\unshift.
- b/collection/shift [amount, register]: Remove the first N number of elements from an array.
- b/collection/transform [attributes, exclusive, separator, register]: Iterate over all objects and transform each key per the attribute transformers specifications. If exclusive is set to false then the current object will be overridden/merged. Separator can also be set for key path support. This job uses Realize, which provides its own extendable value-transformation pipeline. If an attribute is not set with
explicit: true
then it will automatically start from the key's value from the record. Ifexplicit: true
is started, then it will start from the record itself. - b/collection/unpivot [pivot_set, register]: Take an array of objects and unpivot specific sets of keys into rows. Under the hood it uses HashMath's Unpivot class.
- b/collection/unshift [from_registers, register]: Adds the values of the
from_registers
to theregister
array. All existing elements in theregister
array will be shifted upwards. - b/collection/validate [invalid_register, join_char, message_key, register, separator, validations]: Take an array of objects, run it through each declared validator, and split the objects into two registers. The valid objects will be split into the current register while the invalid ones will go into the invalid_register as declared. Optional arguments, join_char and message_key, help determine the compiled error messages. The separator option can be utilized to use dot-notation for validating keys. See each validation's options by viewing their classes within the
lib/modeling/validations
directory. - b/collection/values [include_keys, register]: Take an array of objects and call
#values
on each object. If include_keys is true (it is false by default), then call#keys
on the first object and inject that as a "header" object. - b/collection/zip [base_register, register, with_register]: Combines
base_register
andwith_register
s' data to form one single array inregister
. It will combine each element, positionally in each array to form the final array. For example: ['hello', 'bugs'] + ['world', 'bunny'] => [['hello', 'world'], ['bugs', 'bunny']]
Compression
- b/compress/row_reader [data_key, ignore_blank_path, ignore_blank_data, path_key, register, separator]: Iterates over an array of objects, extracts a path and data in each object, and creates a zip file.
De-serialization
- b/deserialize/csv [register]: Take a CSV string and de-serialize into object(s). Currently it will return an array of arrays, with each nested array representing one row.
- b/deserialize/json [register]: Treat input as a string and de-serialize it to JSON.
- b/deserialize/yaml [register, safe]: Treat input as a string and de-serialize it to YAML. By default it will try and safely de-serialize it (only using core classes). If you wish to de-serialize it to any class type, pass in
safe: false
IO
By default all jobs will use the Burner::Disks::Local
disk for its persistence. But this is configurable by implementing and registering custom disk-based classes in the Burner::Disks
factory. For example: a consumer application may also want to interact with cloud-based storage providers and could leverage this as its job library instead of implementing custom jobs.
- b/io/exist [disk, path, short_circuit]: Check to see if a file exists. The path parameter can be interpolated using
Payload#params
. If short_circuit was set to true (defaults to false) and the file does not exist then the pipeline will be short-circuited. - b/io/read [binary, disk, path, register]: Read in a local file. The path parameter can be interpolated using
Payload#params
. If the contents are binary, pass inbinary: true
to open it up in binary+read mode. - b/io/row_reader [data_key, disk, ignore_blank_path, ignore_file_not_found, path_key, register, separator]: Iterates over an array of objects, extracts a filepath from a key in each object, and attempts to load the file's content for each record. The file's content will be stored at the specified data_key. By default missing paths or files will be treated as hard errors. If you wish to ignore these then pass in true for ignore_blank_path and/or ignore_file_not_found.
- b/io/write [binary, disk, path, register, supress_side_effect]: Write to a local file. The path parameter can be interpolated using
Payload#params
. If the contents are binary, pass inbinary: true
to open it up in binary+write mode. By default, written files are also logged as WrittenFile instances to the Payload#side_effects array. You can pass in supress_side_effect: true to disable this behavior.
Parameters
- b/param/from_register [param_key, register]: Copy the value of a register to a param key.
- b/param/to_register [param_key, register]: Copy the value of a param key to a register.
Serialization
- b/serialize/csv [byte_order_mark, register]: Take an array of arrays and create a CSV. You can optionally pre-pend a byte order mark, see Burner::Modeling::ByteOrderMark for acceptable options.
- b/serialize/json [register]: Convert value to JSON.
- b/serialize/yaml [register]: Convert value to YAML.
Value
- b/value/copy [from_register, to_register]: Copy from_register's value into the to_register. Note: this does not do any deep copying and should be assumed it is shallow copying all objects.
- b/value/nest [key, register]: This job will nest the current value within a new outer hash. The specified key passed in will be the corresponding new hash key entry for the existing value.
- b/value/static [register, value]: Set the value to any arbitrary value.
- b/value/transform [register, separator, transformers]: Transform the current value of the register through a Realize::Pipeline. This will transform the entire value, as opposed to the b/collection/transform job, which will iterate over each row/record in a dataset and transform each row/record.
General
- b/echo [message]: Write a message to the output. The message parameter can be interpolated using
Payload#params
. - b/nothing []: Do nothing.
- b/sleep [seconds]: Sleep the thread for X number of seconds.
Notes:
- If you see that a job accepts a 'register' attribute/argument, that indicates a job will access and/or mutate the payload. The register indicates which part of the payload the job will interact with. This allows jobs to be placed into 'lanes'. If register is not specified, then the default register is used.
Adding & Registering Jobs
Note: Jobs have to be registered with a type in the Burner::Jobs factory. All jobs that ship with this library are prefixed with b/
in their type in order to provide a namespace for 'burner-specific' jobs vs. externally provided jobs.
Where this library shines is when additional jobs are plugged in. Burner uses its Burner::Jobs
class as its class-level registry built with acts_as_hashable's acts_as_hashable_factory directive.
Let's say we would like to register a job to parse a CSV:
class ParseCsv < Burner::JobWithRegister
def perform(output, payload)
payload[register] = CSV.parse(payload[register], headers: true).map(&:to_h)
nil
end
end
Burner::Jobs.register('parse_csv', ParseCsv)
parse_csv
is now recognized as a valid job and we can use it:
pipeline = {
jobs: [
{
name: :read,
type: 'b/io/read',
path: '{input_file}'
},
{
name: :output_id,
type: 'b/echo',
message: 'The job id is: {__id}'
},
{
name: :output_value,
type: 'b/echo',
message: 'The current value is: {__default_register}'
},
{
name: :parse,
type: :parse_csv
},
{
name: :convert,
type: 'b/serialize/yaml'
},
{
name: :write,
type: 'b/io/write',
path: '{output_file}'
}
],
steps: %i[
read
output_id
output_value
parse
convert
output_value
write
]
}
params = {
input_file: File.join('spec', 'fixtures', 'cars.csv'),
output_file: File.join(TEMP_DIR, "#{SecureRandom.uuid}.yaml")
}
payload = Burner::Payload.new(params: params)
Burner::Pipeline.make(pipeline).execute(output: output, payload: payload)
Contributing
Development Environment Configuration
Basic steps to take to get this repository compiling:
- Install Ruby (check burner.gemspec for versions supported)
- Install bundler (gem install bundler)
- Clone the repository (git clone [email protected]:bluemarblepayroll/burner.git)
- Navigate to the root folder (cd burner)
- Install dependencies (bundle)
Running Tests
To execute the test suite run:
bundle exec rspec spec --format documentation
Alternatively, you can have Guard watch for changes:
bundle exec guard
Also, do not forget to run Rubocop:
bundle exec rubocop
Publishing
Note: ensure you have proper authorization before trying to publish new versions.
After code changes have successfully gone through the Pull Request review process then the following steps should be followed for publishing new versions:
- Merge Pull Request into master
- Update
lib/burner/version.rb
using semantic versioning - Install dependencies:
bundle
- Update
CHANGELOG.md
with release notes - Commit & push master to remote and ensure CI builds master successfully
- Run
bundle exec rake release
, which will create a git tag for the version, push git commits and tags, and push the.gem
file to rubygems.org.
Code of Conduct
Everyone interacting in this codebase, issue trackers, chat rooms and mailing lists is expected to follow the code of conduct.
License
This project is MIT Licensed.