Class: Tap::App
- Inherits:
-
Object
- Object
- Tap::App
- Includes:
- Configurable, MonitorMixin, Signals
- Defined in:
- lib/tap/app.rb,
lib/tap/app/api.rb,
lib/tap/app/queue.rb,
lib/tap/app/stack.rb,
lib/tap/app/state.rb
Overview
App coordinates the setup and execution of workflows.
Defined Under Namespace
Modules: State Classes: Api, Queue, Stack, TerminateError
Constant Summary collapse
- CONTEXT =
A variable to store the application context in Thread.current
'tap.context'
- CURRENT =
A variable to store an instance in the application context.
'tap.current'
- CALL_KEYS =
The reserved call keys
%w{obj sig args}
- INIT_KEYS =
The reserved init keys
%w{var class spec}
- RESERVED_KEYS =
Reserved call and init keys as a single array
CALL_KEYS + INIT_KEYS
- OBJECT =
Splits a signal into an object string and a signal string. If OBJECT doesn’t match, then the string can be considered a signal, and the object is nil. After a match:
$1:: The object string (ex: 'obj/sig' => 'obj') $2:: The signal string (ex: 'obj/sig' => 'sig')
/\A(.*)\/(.*)\z/
- LOG_FORMAT =
The default log format
"%s %8s %10s %s\n"
- LOG_FORMATTER =
The default logger formatter – uses LOG_FORMAT
lambda do |severity, time, head, tail| code = (severity == 'INFO' ? ' ' : severity[0,1]) LOG_FORMAT % [code, time.strftime('%H:%M:%S'), head, tail] end
Instance Attribute Summary collapse
-
#env ⇒ Object
The application environment.
-
#logger ⇒ Object
The application logger.
-
#objects ⇒ Object
readonly
A cache of application objects.
-
#queue ⇒ Object
readonly
The application queue.
-
#stack ⇒ Object
The application call stack for executing tasks.
-
#state ⇒ Object
readonly
The state of the application (see App::State).
Class Method Summary collapse
- .build(spec = {}, app = current) ⇒ Object
- .context ⇒ Object
- .current ⇒ Object
- .current=(app) ⇒ Object
- .set_context(context = {}) ⇒ Object
- .with_context(context) ⇒ Object
Instance Method Summary collapse
- #build(spec, &block) ⇒ Object
-
#call(args, &block) ⇒ Object
Sends a signal to an application object.
-
#check_terminate ⇒ Object
Raises a TerminateError if state is TERMINATE.
-
#enq(task, input = []) ⇒ Object
Enques the task with the input.
-
#exe(task, input = []) ⇒ Object
Executes tasks by doing the following.
-
#gc(all = false) ⇒ Object
Removes objects keyed by integers.
-
#get(var) ⇒ Object
Returns the object set to var, or self if var is nil.
-
#info ⇒ Object
Returns an information string for the App.
-
#init(clas, *args, &block) ⇒ Object
Resolves the class in env and initializes a new instance with the args and block.
-
#initialize(config = {}, options = {}) ⇒ App
constructor
Creates a new App with the given configuration.
- #inspect ⇒ Object
-
#log(action = '', msg = nil, level = Logger::INFO) ⇒ Object
Logs the action and message at the input level (default INFO).
-
#middleware ⇒ Object
Returns an array of middlware in use by self.
-
#obj(var) ⇒ Object
Same as get, but raises an error if no object is set to the variable.
-
#pq(task, input = []) ⇒ Object
Priority-enques (unshifts) the task with the input.
-
#reset ⇒ Object
Clears objects, the queue, and resets the stack so that no middleware is used.
- #route(obj, sig, &block) ⇒ Object
-
#run ⇒ Object
Sequentially executes each enqued job (a [task, input] pair).
-
#scope ⇒ Object
Sets self as instance in the current context, for the duration of the block (see App.with_context).
-
#serialize(bare = true) ⇒ Object
Converts the self to a schema that can be used to build a new app with equivalent application objects, queue, and middleware.
-
#set(var, obj) ⇒ Object
Sets the object to the specified variable and returns obj.
- #signal(sig, &block) ⇒ Object
-
#stop ⇒ Object
Signals a running app to stop executing tasks to the application stack by setting state to STOP.
-
#terminate ⇒ Object
Signals a running application to terminate execution by setting state to TERMINATE.
- #to_spec ⇒ Object
-
#use(clas, *argv) ⇒ Object
Adds the specified middleware to the stack.
-
#var(obj, auto_assign = true) ⇒ Object
Returns the variable for the object.
Methods included from Signals
Methods included from Signals::ModuleMethods
Constructor Details
#initialize(config = {}, options = {}) ⇒ App
Creates a new App with the given configuration. Options can be used to specify objects that are normally initialized for every new app:
:stack the application stack; an App::Stack
:queue the application queue; an App::Queue
:objects application objects; a hash of (var, object) pairs
:logger the application logger
:env the application environment
A block may also be provided; it will be set as a default join.
213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 |
# File 'lib/tap/app.rb', line 213 def initialize(config={}, ={}) super() # monitor @state = State::READY @stack = [:stack] || Stack.new(self) @queue = [:queue] || Queue.new @objects = [:objects] || {} @logger = [:logger] || begin logger = Logger.new($stderr) logger.level = Logger::INFO logger.formatter = LOG_FORMATTER logger end @env = [:env] || Env.new initialize_config(config) end |
Instance Attribute Details
#env ⇒ Object
The application environment
113 114 115 |
# File 'lib/tap/app.rb', line 113 def env @env end |
#logger ⇒ Object
The application logger
110 111 112 |
# File 'lib/tap/app.rb', line 110 def logger @logger end |
#objects ⇒ Object (readonly)
A cache of application objects
107 108 109 |
# File 'lib/tap/app.rb', line 107 def objects @objects end |
#queue ⇒ Object (readonly)
The application queue
104 105 106 |
# File 'lib/tap/app.rb', line 104 def queue @queue end |
#stack ⇒ Object
The application call stack for executing tasks
101 102 103 |
# File 'lib/tap/app.rb', line 101 def stack @stack end |
#state ⇒ Object (readonly)
The state of the application (see App::State)
98 99 100 |
# File 'lib/tap/app.rb', line 98 def state @state end |
Class Method Details
.build(spec = {}, app = current) ⇒ Object
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/tap/app.rb', line 39 def build(spec={}, app=current) config = spec['config'] || {} signals = spec['signals'] || [] if spec['self'] app.reconfigure(config) else app = new(config, :env => app.env) end signals.each do |args| app.call(args) end app.gc app end |
.context ⇒ Object
27 28 29 |
# File 'lib/tap/app.rb', line 27 def context Thread.current[CONTEXT] ||= {} end |
.current ⇒ Object
35 36 37 |
# File 'lib/tap/app.rb', line 35 def current context[CURRENT] ||= new end |
.current=(app) ⇒ Object
31 32 33 |
# File 'lib/tap/app.rb', line 31 def current=(app) context[CURRENT] = app end |
.set_context(context = {}) ⇒ Object
12 13 14 15 16 |
# File 'lib/tap/app.rb', line 12 def set_context(context={}) current = Thread.current[CONTEXT] Thread.current[CONTEXT] = context current end |
.with_context(context) ⇒ Object
18 19 20 21 22 23 24 25 |
# File 'lib/tap/app.rb', line 18 def with_context(context) begin current = set_context(context) yield ensure set_context(current) end end |
Instance Method Details
#build(spec, &block) ⇒ Object
387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 |
# File 'lib/tap/app.rb', line 387 def build(spec, &block) var = spec['var'] clas = spec['class'] spec = spec['spec'] || spec obj = nil unless clas.nil? method_name = spec.kind_of?(Array) ? :parse : :build obj = env.constant(clas).send(method_name, spec, self, &block) end unless var.nil? if var.respond_to?(:each) var.each {|v| set(v, obj) } else set(var, obj) end end obj end |
#call(args, &block) ⇒ Object
Sends a signal to an application object. The input should be a hash defining these fields:
obj # a variable identifying an object, or nil for self
sig # the signal name
args # arguments to the signal (typically a Hash)
Call does the following:
object = app.get(obj) # lookup an application object by obj
signal = object.signal(sig) # lookup a signal by sig
signal.call(args) # call the signal with args
Call returns the result of the signal call.
353 354 355 356 357 358 359 360 361 362 |
# File 'lib/tap/app.rb', line 353 def call(args, &block) log(:call, nil, Logger::DEBUG) { args.inspect } if debug obj = args['obj'] sig = args['sig'] args = args['args'] || args # nil obj routes back to app, so optimize by evaluating signal directly (obj.nil? ? signal(sig, &block) : route(obj, sig, &block)).call(args) end |
#check_terminate ⇒ Object
Raises a TerminateError if state is TERMINATE. Nodes should call check_terminate to provide breakpoints in long-running processes.
A block may be provided to check_terminate to execute code before raising the TerminateError.
551 552 553 554 555 556 |
# File 'lib/tap/app.rb', line 551 def check_terminate if state == State::TERMINATE yield() if block_given? raise TerminateError.new end end |
#enq(task, input = []) ⇒ Object
Enques the task with the input. Returns the task.
268 269 270 271 |
# File 'lib/tap/app.rb', line 268 def enq(task, input=[]) queue.enq(task, input) task end |
#exe(task, input = []) ⇒ Object
Executes tasks by doing the following.
-
call stack with the task and input
-
call the task joins (task.joins)
Returns the stack result.
468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 |
# File 'lib/tap/app.rb', line 468 def exe(task, input=[]) log("#{var(task)} <<", "#{input.inspect} (#{task.class})", Logger::DEBUG) if debug output = stack.call(task, input) log("#{var(task)} >>", "#{output.inspect} (#{task.class})", Logger::DEBUG) if debug if task.respond_to?(:joins) if joins = task.joins joins.each do |join| join.call(output) end end end output end |
#gc(all = false) ⇒ Object
Removes objects keyed by integers. If all is specified, gc will clear all objects.
329 330 331 332 333 334 335 336 337 |
# File 'lib/tap/app.rb', line 329 def gc(all=false) if all objects.clear else objects.delete_if {|var, obj| var.kind_of?(Integer) } end self end |
#get(var) ⇒ Object
Returns the object set to var, or self if var is nil.
295 296 297 |
# File 'lib/tap/app.rb', line 295 def get(var) var.nil? ? self : objects[var] end |
#info ⇒ Object
562 563 564 |
# File 'lib/tap/app.rb', line 562 def info "state: #{state} (#{State.state_str(state)}) queue: #{queue.size}" end |
#init(clas, *args, &block) ⇒ Object
Resolves the class in env and initializes a new instance with the args and block. Note that the app is not appended to args by default.
383 384 385 |
# File 'lib/tap/app.rb', line 383 def init(clas, *args, &block) env.constant(clas).new(*args, &block) end |
#inspect ⇒ Object
676 677 678 |
# File 'lib/tap/app.rb', line 676 def inspect "#<#{self.class}:#{object_id} #{info}>" end |
#log(action = '', msg = nil, level = Logger::INFO) ⇒ Object
Logs the action and message at the input level (default INFO). The message may be generated by a block; in that case leave the message unspecified as nil.
Logging is suppressed if quiet is true.
Performance Considerations
Using a block to generate a message is quicker if logging is off, but slower when logging is on. However, when messages use a lot of interpolation the log time is dominated by the interpolation; at some point the penalty for using a block is outweighed by the benefit of being able to skip the interpolation.
For example:
log(:action, "this is fast")
log(:action) { "and there's not much benefit to the block" }
log(:action, "but a message with #{a}, #{b}, #{c}, and #{d}")
log(:action) { "may be #{best} in a block because you can #{turn} #{it} #{off}" }
260 261 262 263 264 265 |
# File 'lib/tap/app.rb', line 260 def log(action='', msg=nil, level=Logger::INFO) if !quiet || verbose msg = yield if msg.nil? && block_given? logger.add(level, msg.to_s, action.to_s) end end |
#middleware ⇒ Object
Returns an array of middlware in use by self.
418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 |
# File 'lib/tap/app.rb', line 418 def middleware middleware = [] # collect middleware by walking up the stack synchronize do current = stack visited = [current] while current.respond_to?(:stack) middleware << current current = current.stack circular_stack = visited.include?(current) visited << current if circular_stack visited.collect! {|m| m.class.to_s }.join(', ') raise "circular stack detected:\n[#{visited}]" end end end middleware end |
#obj(var) ⇒ Object
Same as get, but raises an error if no object is set to the variable.
300 301 302 |
# File 'lib/tap/app.rb', line 300 def obj(var) get(var) or raise "no object set to: #{var.inspect}" end |
#pq(task, input = []) ⇒ Object
Priority-enques (unshifts) the task with the input. Returns the task.
274 275 276 277 |
# File 'lib/tap/app.rb', line 274 def pq(task, input=[]) queue.unshift(task, input) task end |
#reset ⇒ Object
Clears objects, the queue, and resets the stack so that no middleware is used. Reset raises an error unless state is READY.
445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 |
# File 'lib/tap/app.rb', line 445 def reset synchronize do unless state == State::READY raise "cannot reset unless READY" end # walk up middleware to find the base of the stack while @stack.respond_to?(:stack) @stack = @stack.stack end objects.clear queue.clear end self end |
#route(obj, sig, &block) ⇒ Object
369 370 371 372 373 374 375 376 377 378 379 |
# File 'lib/tap/app.rb', line 369 def route(obj, sig, &block) unless object = get(obj) raise "unknown object: #{obj.inspect}" end unless object.respond_to?(:signal) raise "cannot signal: #{object.inspect}" end object.signal(sig, &block) end |
#run ⇒ Object
Sequentially executes each enqued job (a [task, input] pair). A run continues until the queue is empty.
Run checks the state of self before executing a task. If the state changes from RUN, the following behaviors result:
STOP No more tasks will be executed; the current task
will continute to completion.
TERMINATE No more tasks will be executed and the currently
running task will be discontinued as described in
terminate.
Calls to run when the state is not READY do nothing and return immediately.
Returns self.
500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 |
# File 'lib/tap/app.rb', line 500 def run synchronize do return self unless state == State::READY @state = State::RUN end begin while state == State::RUN break unless job = queue.deq exe(*job) end rescue(TerminateError) # gracefully fail for termination errors queue.unshift(*job) ensure synchronize { @state = State::READY } end self end |
#scope ⇒ Object
Sets self as instance in the current context, for the duration of the block (see App.with_context).
568 569 570 |
# File 'lib/tap/app.rb', line 568 def scope App.with_context(CURRENT => self) { yield } end |
#serialize(bare = true) ⇒ Object
Converts the self to a schema that can be used to build a new app with equivalent application objects, queue, and middleware. Schema are a collection of signal hashes such that this will rebuild the state of a on b:
a, b = App.new, App.new
a.to_schema.each {|spec| b.call(spec) }
Application objects that do not satisfy the application object API are quietly ignored; enable debugging to be warned of their existance.
583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 |
# File 'lib/tap/app.rb', line 583 def serialize(=true) # setup variables specs = {} order = [] # collect enque signals to setup queue signals = queue.to_a.collect do |(task, input)| {'sig' => 'enq', 'args' => {'var' => var(task), 'input' => input}} end # collect and trace application objects objects.keys.sort_by do |var| var.to_s end.each do |var| obj = objects[var] order.concat trace(obj, specs) end middleware.each do |obj| order.concat trace(obj, specs) end if order.delete(self) specs.delete(self) else order.unshift(self) trace(self, specs) end order.uniq! # assemble specs variables = {} objects.each_pair do |var, obj| (variables[obj] ||= []) << var end specs.keys.each do |obj| spec = {'sig' => 'set'} # assign variables if vars = variables[obj] if vars.length == 1 spec['var'] = vars[0] else spec['var'] = vars end end # assign the class spec['class'] = obj.class.to_s # merge obj_spec if possible obj_spec = specs[obj] if (obj_spec.keys & RESERVED_KEYS).empty? spec.merge!(obj_spec) else spec['spec'] = obj_spec end specs[obj] = spec end middleware.each do |obj| spec = specs[obj] spec['sig'] = 'use' end order.collect! {|obj| specs[obj] }.concat(signals) end |
#set(var, obj) ⇒ Object
Sets the object to the specified variable and returns obj. Provide nil as obj to un-set a variable (in which case the existing object is returned).
Nil is reserved as a variable name and cannot be used by set.
284 285 286 287 288 289 290 291 292 |
# File 'lib/tap/app.rb', line 284 def set(var, obj) raise "no var specified" if var.nil? if obj objects[var] = obj else objects.delete(var) end end |
#signal(sig, &block) ⇒ Object
364 365 366 367 |
# File 'lib/tap/app.rb', line 364 def signal(sig, &block) sig = sig.to_s sig =~ OBJECT ? route($1, $2, &block) : super(sig, &block) end |
#stop ⇒ Object
Signals a running app to stop executing tasks to the application stack by setting state to STOP. The task currently in the stack will continue to completion.
Does nothing unless state is RUN.
526 527 528 529 |
# File 'lib/tap/app.rb', line 526 def stop synchronize { @state = State::STOP if state == State::RUN } self end |
#terminate ⇒ Object
Signals a running application to terminate execution by setting state to TERMINATE. In this state, calls to check_terminate will raise a TerminateError. Run considers TerminateErrors a normal exit and rescues them quietly.
Nodes can set breakpoints that call check_terminate to invoke task-specific termination. If a task never calls check_terminate, then it will continue to completion.
Does nothing if state is READY.
541 542 543 544 |
# File 'lib/tap/app.rb', line 541 def terminate synchronize { @state = State::TERMINATE unless state == State::READY } self end |
#to_spec ⇒ Object
654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 |
# File 'lib/tap/app.rb', line 654 def to_spec signals = serialize(false) spec = signals.shift spec.delete('self') spec.delete('sig') var = spec.delete('var') klass = spec.delete('class') spec = spec.delete('spec') || spec signals.unshift( 'sig' => 'set', 'var' => var, 'class' => klass, 'self' => true ) if var spec['signals'] = signals spec end |
#use(clas, *argv) ⇒ Object
Adds the specified middleware to the stack. The argv will be used as extra arguments to initialize the middleware.
411 412 413 414 415 |
# File 'lib/tap/app.rb', line 411 def use(clas, *argv) synchronize do @stack = init(clas, @stack, *argv) end end |
#var(obj, auto_assign = true) ⇒ Object
Returns the variable for the object. If the object is not assigned to a variable and auto_assign is true, then the object is set to an unused variable and the new variable is returned.
The new variable will be an integer and will be removed upon gc.
309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 |
# File 'lib/tap/app.rb', line 309 def var(obj, auto_assign=true) objects.each_pair do |var, object| return var if obj == object end return nil unless auto_assign var = objects.length loop do if objects.has_key?(var) var += 1 else set(var, obj) return var end end end |