NatsWork::Server
Job processing server for the NatsWork distributed job processing system.
Installation
Add to your application's Gemfile:
gem 'natswork-server'
And execute:
$ bundle install
Configuration
NatsWork::Server.configure do |config|
config.nats_url = ENV['NATS_URL'] || 'nats://localhost:4222'
config.concurrency = 25
config.queues = ['critical', 'default', 'low']
config.logger = Logger.new(STDOUT)
end
Starting a Worker
Command Line
# Start with default settings
natswork
# Specify queues and concurrency
natswork -q critical,default -c 10
# With config file
natswork -C config/natswork.yml
Programmatically
require 'natswork/server'
worker = NatsWork::Worker.new(
queues: ['critical', 'default'],
concurrency: 10,
logger: Logger.new(STDOUT)
)
# Handle signals gracefully
trap('TERM') { worker.stop }
trap('INT') { worker.stop }
worker.start
Worker Features
Concurrency
Workers process jobs concurrently using a thread pool:
worker = NatsWork::Worker.new(
concurrency: 25 # Number of concurrent jobs
)
Queue Priority
Process queues in order of priority:
worker = NatsWork::Worker.new(
queues: ['critical', 'default', 'low'],
strict: true # Strict priority ordering
)
Middleware
Add server-side middleware:
class MetricsMiddleware
def call(job, )
start = Time.now
yield
ensure
duration = Time.now - start
StatsD.timing("job.duration", duration, tags: ["job:#{job.class.name}"])
end
end
NatsWork::Server.configure do |config|
config.server_middleware do |chain|
chain.add MetricsMiddleware
end
end
Error Handling
Configure error handling and retries:
NatsWork::Server.configure do |config|
# Retry configuration
config.max_retries = 5
config.retry_backoff = :exponential
# Dead letter queue
config.enable_dead_letter = true
config.dead_letter_queue = 'failed_jobs'
# Error callbacks
config.error_handler = ->(error, job, ) {
Bugsnag.notify(error, {
job_class: job.class.name,
job_id: ['job_id']
})
}
end
Monitoring
Health Checks
Built-in health check endpoint:
# Add health checks
NatsWork::HealthChecker.global.add_check('database') do
ActiveRecord::Base.connection.active?
end
# Check health
health = NatsWork::HealthChecker.global.report
# => { status: :healthy, checks: { ... } }
Metrics
Track worker metrics:
metrics = worker.stats
# => {
# jobs_processed: 1000,
# jobs_failed: 5,
# active_jobs: 3,
# queue_latency: { default: 0.5 }
# }
Logging
Structured logging with job context:
NatsWork::Server.configure do |config|
config.logger = Logger.new(STDOUT)
config.log_level = :info
config.log_format = :json # JSON structured logs
end
Deployment
Systemd
[Unit]
Description=NatsWork Worker
After=network.target
[Service]
Type=simple
User=app
WorkingDirectory=/app
ExecStart=/usr/local/bin/bundle exec natswork
Restart=always
RestartSec=3
[Install]
WantedBy=multi-user.target
Docker
FROM ruby:3.1
WORKDIR /app
COPY Gemfile* ./
RUN bundle install
COPY . .
CMD ["bundle", "exec", "natswork"]
Kubernetes
apiVersion: apps/v1
kind: Deployment
metadata:
name: natswork-worker
spec:
replicas: 3
template:
spec:
containers:
- name: worker
image: myapp/worker:latest
env:
- name: NATS_URL
value: nats://nats:4222
- name: CONCURRENCY
value: "25"
resources:
requests:
memory: "256Mi"
cpu: "500m"
limits:
memory: "512Mi"
cpu: "1000m"
Scaling
Horizontal Scaling
Deploy multiple workers:
# Start multiple workers on different machines
natswork -q default -c 25 # Machine 1
natswork -q default -c 25 # Machine 2
natswork -q default -c 25 # Machine 3
Auto-scaling
Configure based on queue depth:
# Monitor queue depth
monitor = NatsWork::QueueMonitor.new
if monitor.queue_depth('default') > 1000
# Scale up workers
end
Testing
require 'natswork/testing'
RSpec.describe MyJob do
it 'processes job' do
NatsWork::Testing.inline! do
MyJob.perform_async('arg1', 'arg2')
# Job runs synchronously in test
end
end
end
API Reference
See the API documentation for detailed class and method documentation.
Contributing
Bug reports and pull requests are welcome at https://github.com/yourusername/natswork.
License
MIT License