SwineHerd
Swineherd is for running scripts and workflows on filesystems.
Outline
A workflow
is built with script
objects and ran on a filesystem
.
Script:
A script has the following
source
– The source file used. These can be Apache Pig scripts, Wukong scripts, even R scripts. You can add your own scripts by subclassing thescript
class.input
– An array of input paths.output
– An array of output paths.options
– A ruby hash of options used as command line args. Eg. => ’bar’. How these options are mapped to command line arguments is up to the particular script class.attributes
– A ruby hash of parameters used for variable substitution. Every script is assumed to be (but not required to be) an eruby template.
Workflow:
A workflow is built using rake task
objects that doing nothing more than run scripts. A workflow
- can be described with a directed dependency graph
- has an
id
which is used to run its tasks idempotently. At the moment it is the responsibility of the running process (or human being) to choose a suitable id. - manages intermediate outputs by using the
next_output
andlatest_output
methods. See the examples dir for usage. - A workflow has a working directory in which all intermediate outputs go
- These are named according to the rake task that created them
FileSystem
Workflows are intended to run on filesystems. At the moment, implemented filesystems are
file
– Local file system. Only thoroughly tested on unbuntu linux.hdfs
– Hadoop distributed file system. Uses jruby and the Apache Hadoop 0.20 api.s3
– Uses the right_aws gem for interacting with Amazon Simple Storage System (s3).
Using the filesystem:
Paths should be absolute.
# get a new instance of local filesystem and write to it
localfs = FileSystem.get(:file)
localfs.open("mylocalfile", 'w') do |f|
f.write("Writing a string to a local file")
end
# get a new instance of hadoop filesystem and write to it
hadoopfs = FileSystem.get(:hdfs)
hadoopfs.open("myhadoopfile", 'w') do |f|
f.write("Writing a string to an hdfs file")
end
# get a new instance of s3 filesystem and write to it
access_key_id = '1234abcd'
secret_access_key = 'foobar1234'
s3fs = FileSystem.get(:s3, accees_key_id, secret_access_key)
s3fs.mkpath 'mys3bucket' # bucket must exist
s3fs.open("mys3bucket/mys3file", 'w') do |f|
f.write("Writing a string to an s3 file")
end
Working Example
For the most up to date working example see the examples directory. Here’s a simple example for running pagerank:
#!/usr/bin/env ruby
$LOAD_PATH << '../../lib'
require 'swineherd' ; include Swineherd
require 'swineherd/script' ; include Swineherd::Script
require 'swineherd/filesystem'
Settings.define :flow_id, :required => true, :description => "Flow id required to make run of workflow unique"
Settings.define :iterations, :type => Integer, :default => 10, :description => "Number of pagerank iterations to run"
Settings.define :hadoop_home, :default => '/usr/local/share/hadoop', :description => "Path to hadoop config"
Settings.resolve!
flow = Workflow.new(Settings.flow_id) do
# The filesystems we're going to be working with
hdfs = Swineherd::FileSystem.get(:hdfs)
localfs = Swineherd::FileSystem.get(:file)
# The scripts we're going to use
initializer = PigScript.new('scripts/pagerank_initialize.pig')
iterator = PigScript.new('scripts/pagerank.pig')
finisher = WukongScript.new('scripts/cut_off_list.rb')
plotter = RScript.new('scripts/histogram.R')
#
# Runs simple pig script to initialize pagerank. We must specify the input
# here as this is the first step in the workflow. The output attribute is to
# ensure idempotency and the options attribute is the hash that will be
# converted into command-line args for the pig interpreter.
#
task :pagerank_initialize do
initializer. = {:adjlist => "/tmp/pagerank_example/seinfeld_network.tsv", :initgrph => next_output(:pagerank_initialize)}
initializer.run(:hadoop) unless hdfs.exists? latest_output(:pagerank_initialize)
end
#
# Runs multiple iterations of pagerank with another pig script and manages all
# the intermediate outputs.
#
task :pagerank_iterate => [:pagerank_initialize] do
iterator.[:damp] = '0.85f'
iterator.[:curr_iter_file] = latest_output(:pagerank_initialize)
Settings.iterations.times do
iterator.[:next_iter_file] = next_output(:pagerank_iterate)
iterator.run(:hadoop) unless hdfs.exists? latest_output(:pagerank_iterate)
iterator.refresh!
iterator.[:curr_iter_file] = latest_output(:pagerank_iterate)
end
end
#
# Here we use a wukong script to cut off the last field (a big pig bag of
# links). Notice how every wukong script MUST have an input but pig scripts do
# not.
#
task :cut_off_adjacency_list => [:pagerank_iterate] do
finisher.input << latest_output(:pagerank_iterate)
finisher.output << next_output(:cut_off_adjacency_list)
finisher.run :hadoop unless hdfs.exists? latest_output(:cut_off_adjacency_list)
end
#
# We want to pull down one result file, merge the part-000.. files into one file
#
task :merge_results => [:cut_off_adjacency_list] do
merged_results = next_output(:merge_results)
hdfs.merge(latest_output(:cut_off_adjacency_list), merged_results) unless hdfs.exists? merged_results
end
#
# Cat results into a local directory with the same structure
# eg. #{work_dir}/#{flow_id}/pull_down_results-0.
#
# FIXME: Bridging filesystems is cludgey.
#
task :pull_down_results => [:merge_results] do
local_results = next_output(:pull_down_results)
hdfs.copy_to_local(latest_output(:merge_results), local_results) unless localfs.exists? local_results
end
#
# Plot 2nd column of the result as a histogram (requires R and
# ggplot2). Note that the output here is a png file but doesn't have that
# extension. Ensmarten me as to the right way to handle that?
#
task :plot_results => [:pull_down_results] do
plotter.attributes = {
:pagerank_data => latest_output(:pull_down_results),
:plot_file => next_output(:plot_results), # <-- this will be a png...
:raw_rank => "aes(x=d$V2)"
}
plotter.run(:local) unless localfs.exists? latest_output(:plot_results)
end
end
flow.workdir = "/tmp/pagerank_example"
flow.describe
flow.run(:plot_results)
Utils
There’s a fun little program to emphasize the ease of using the filesystem abstraction called ‘hdp-tree’:
$: bin/hdp-tree /tmp/my_hdfs_directory
---
/tmp/my_hdfs_directory:
- my_hdfs_directory:
- sub_dir_a: leaf_file_1
- sub_dir_a: leaf_file_2
- sub_dir_a: leaf_file_3
- my_hdfs_directory:
- sub_dir_b: leaf_file_1
- sub_dir_b: leaf_file_2
- sub_dir_b: leaf_file_3
- my_hdfs_directory:
- sub_dir_c: leaf_file_1
- sub_dir_c: leaf_file_2
- sub_dir_c: leaf_file_3
- sub_dir_c:
- sub_sub_dir_a: yet_another_leaf_file
- sub_dir_c: sub_sub_dir_b
- sub_dir_c: sub_sub_dir_c
I know, it’s not as pretty as unix tree, but this IS github…
TODO
- next task in a workflow should NOT run if the previous step failed
- this is made difficult by the fact that, sometimes?, when a pig script fails it still returns a 0 exit status
- same for wukong scripts
- add a
job
object that implements anot_if
function. this way aworkflow
will be constructed ofjob
objects- a
job
will do nothing more than execute the ruby code in it’s (run?) block, unlessnot_if
is true - this way we can put
script
objects inside ajob
and only run under certain conditions that the user specifies when they create thejob
- a
- implement ftp filesystem interfaces