FlowNodes
A Ruby port of the minimalist LLM framework, PocketFlow.
FlowNodes is a Ruby gem that brings the lightweight, expressive power of PocketFlow to the Ruby ecosystem. It provides a minimal, graph-based core for building powerful LLM applications like Agents, Workflows, and RAG, without the bloat of larger frameworks.
Design Philosophy
FlowNodes is inspired by and based on PocketFlow, a Python framework created by The Pocket. We've adapted PocketFlow's elegant, minimalist approach to LLM application development for the Ruby ecosystem.
Core principles:
- Minimalist Design: Core functionality in under 500 lines of code
- Graph-based Architecture: Connect nodes to create complex workflows
- LLM-First: Built specifically for Large Language Model applications
- Extensible: Easy to extend with custom nodes and flows
FlowNodes maintains the same philosophy and API patterns as PocketFlow while providing a native Ruby experience. This ensures Ruby developers can leverage the proven design patterns that make PocketFlow so effective.
Installation
Add this line to your application's Gemfile:
gem 'flow_nodes'
And then execute:
$ bundle install
Or install it yourself as:
$ gem install flow_nodes
Features
- Minimal & Lightweight: Core framework in under 500 lines of code
- Graph-based: Build complex workflows with simple node connections
- Async Support: Built-in async and parallel processing capabilities
- Batch Processing: Sequential and parallel batch operations
- Retry Logic: Built-in retry mechanisms with customizable fallbacks
- Thread Safety: Proper isolation and deep copying for concurrent execution
- Extensible: Easy to extend with custom nodes and flows
- Production Ready: Comprehensive test coverage and clean architecture
Quick Start
Basic Node and Flow
require 'flow_nodes'
class GreetingNode < FlowNodes::Node
def exec(params)
puts "Hello, #{params[:name]}!"
"greeted"
end
end
class FarewellNode < FlowNodes::Node
def exec(params)
puts "Goodbye, #{params[:name]}!"
nil # End the flow
end
end
# Create nodes
greeting = GreetingNode.new
farewell = FarewellNode.new
# Connect nodes: greeting -> farewell
greeting - :greeted >> farewell
# Create and run flow
flow = FlowNodes::Flow.new(start: greeting)
flow.set_params(name: "World")
flow.run(nil)
Conditional Flows
class ValidationNode < FlowNodes::Node
def exec(params)
params[:email]&.include?("@") ? "valid" : "invalid"
end
end
class ProcessNode < FlowNodes::Node
def exec(params)
puts "Processing #{params[:email]}"
nil
end
end
class ErrorNode < FlowNodes::Node
def exec(params)
puts "Error: Invalid email #{params[:email]}"
nil
end
end
validator = ValidationNode.new
processor = ProcessNode.new
error_handler = ErrorNode.new
# Branch based on validation result
validator - :valid >> processor
validator - :invalid >> error_handler
flow = FlowNodes::Flow.new(start: validator)
flow.set_params(email: "[email protected]")
flow.run(nil)
Batch Processing
class DataProcessor < FlowNodes::BatchNode
def exec(item)
puts "Processing: #{item}"
item.upcase
end
end
processor = DataProcessor.new
processor.set_params(["hello", "world", "ruby"])
results = processor.run(nil)
# => ["HELLO", "WORLD", "RUBY"]
Async and Parallel Processing
class AsyncProcessor < FlowNodes::AsyncParallelBatchNode
def exec_async(item)
puts "Processing #{item} on thread #{Thread.current.object_id}"
sleep(0.1) # Simulate I/O
item.upcase
end
end
processor = AsyncProcessor.new
processor.set_params(["hello", "world", "ruby"])
results = processor.run_async(nil)
# Processes all items in parallel
Lifecycle Hooks
class LoggingNode < FlowNodes::Node
def prep(state)
puts "Preparing to process"
{ prepared_at: Time.now }
end
def exec(params)
puts "Processing: #{params}"
"success"
end
def post(state, params, result)
puts "Completed with result: #{result}"
end
end
Error Handling and Retries
class RetryNode < FlowNodes::Node
def initialize
super(max_retries: 3, wait: 1)
end
def exec(params)
# Simulate unreliable operation
raise "Network error" if rand < 0.7
"success"
end
def exec_fallback(params, exception)
puts "All retries failed: #{exception.message}"
"failed"
end
end
Architecture
FlowNodes is built around several core classes:
BaseNode: Foundation class with parameter management and connectionsNode: Adds retry logic and lifecycle hooksFlow: Orchestrates node execution and state managementBatchNode: Processes arrays of items sequentiallyAsyncNode: Enables asynchronous executionAsyncBatchNode: Async sequential batch processingAsyncParallelBatchNode: Async parallel batch processingAsyncFlow: Orchestrates async and sync nodes together
Examples
See the examples/ directory for complete examples:
examples/chatbot.rb- Interactive chatbot with self-loopsexamples/workflow.rb- Data validation and processing workflowexamples/batch_processing.rb- Sequential and parallel batch operations
Development
After checking out the repo, run bundle install to install dependencies.
Running Tests
bundle exec rspec
Code Quality
# Run RuboCop for style checking
bundle exec rubocop
# Auto-fix issues
bundle exec rubocop -a
# Generate documentation
bundle exec yard doc
CI/CD Setup
The project uses GitHub Actions for CI/CD with the following secrets required:
QLTY_COVERAGE_TOKEN: Token for Qlty code coverage reportingRUBYGEMS_AUTH_TOKEN: Token for publishing to RubyGems (optional)
Running Examples
# Run the chatbot example
ruby examples/chatbot.rb
# Run the workflow example
ruby examples/workflow.rb
# Run the batch processing example
ruby examples/batch_processing.rb
API Reference
For detailed API documentation, visit the YARD documentation or generate it locally:
bundle exec yard doc
open doc/index.html
Performance Considerations
- Async Operations: Use Ruby threads, subject to the GVL. Best for I/O-bound tasks.
- Parallel Processing:
AsyncParallelBatchNodeprovides concurrency for I/O operations. - Memory: Deep copying ensures thread safety but uses more memory.
- Batch Size: Consider batch sizes for memory usage vs. processing efficiency.
Thread Safety
FlowNodes is designed to be thread-safe:
- Parameters are deep-copied using
Marshalto prevent state bleed - Each flow execution operates on isolated node instances
- Shared state objects should be managed carefully in multi-threaded environments
Contributing
- Fork the repository
- Create your feature branch (
git checkout -b feature/amazing-feature) - Make your changes
- Add tests for your changes
- Ensure all tests pass (
bundle exec rspec) - Run RuboCop (
bundle exec rubocop) - Commit your changes (
git commit -am 'Add amazing feature') - Push to the branch (
git push origin feature/amazing-feature) - Open a Pull Request
Code of Conduct
This project follows the Contributor Covenant code of conduct.
Acknowledgments
This project is a Ruby port of the excellent PocketFlow Python framework created by The Pocket.
Special thanks to:
- The original PocketFlow team for pioneering the minimalist LLM framework approach
- The Python community for inspiring clean, expressive API design
- The Ruby community for providing excellent tools and libraries that made this port possible
FlowNodes would not exist without the groundbreaking work of the PocketFlow team. We encourage users to also check out the original Python PocketFlow and its excellent documentation.
License
The gem is available as open source under the terms of the MIT License.