Class: Tap::App

Inherits:
Object
  • Object
show all
Includes:
Configurable, MonitorMixin, Signals
Defined in:
lib/tap/app.rb,
lib/tap/app/api.rb,
lib/tap/app/queue.rb,
lib/tap/app/stack.rb,
lib/tap/app/state.rb

Overview

App coordinates the setup and execution of workflows.

Defined Under Namespace

Modules: State Classes: Api, Queue, Stack, TerminateError

Constant Summary collapse

CONTEXT =

A variable to store the application context in Thread.current

'tap.context'
CURRENT =

A variable to store an instance in the application context.

'tap.current'
CALL_KEYS =

The reserved call keys

%w{obj sig args}
INIT_KEYS =

The reserved init keys

%w{var class spec}
RESERVED_KEYS =

Reserved call and init keys as a single array

CALL_KEYS + INIT_KEYS
OBJECT =

Splits a signal into an object string and a signal string. If OBJECT doesn’t match, then the string can be considered a signal, and the object is nil. After a match:

$1:: The object string
     (ex: 'obj/sig' => 'obj')
$2:: The signal string
     (ex: 'obj/sig' => 'sig')
/\A(.*)\/(.*)\z/
LOG_FORMAT =

The default log format

"%s %8s %10s %s\n"
LOG_FORMATTER =

The default logger formatter – uses LOG_FORMAT

lambda do |severity, time, head, tail|
  code = (severity == 'INFO' ? ' ' : severity[0,1])
  LOG_FORMAT % [code, time.strftime('%H:%M:%S'), head, tail]
end

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Signals

#sig, #signal?, #signals

Methods included from Signals::ModuleMethods

included

Constructor Details

#initialize(config = {}, options = {}) ⇒ App

Creates a new App with the given configuration. Options can be used to specify objects that are normally initialized for every new app:

:stack      the application stack; an App::Stack
:queue      the application queue; an App::Queue
:objects    application objects; a hash of (var, object) pairs
:logger     the application logger
:env        the application environment

A block may also be provided; it will be set as a default join.



213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
# File 'lib/tap/app.rb', line 213

def initialize(config={}, options={})
  super() # monitor
  
  @state = State::READY
  @stack = options[:stack] || Stack.new(self)
  @queue = options[:queue] || Queue.new
  @objects = options[:objects] || {}
  @logger = options[:logger] || begin
    logger = Logger.new($stderr)
    logger.level = Logger::INFO
    logger.formatter = LOG_FORMATTER
    logger
  end
  @env = options[:env] || Env.new
  
  initialize_config(config)
end

Instance Attribute Details

#envObject

The application environment



113
114
115
# File 'lib/tap/app.rb', line 113

def env
  @env
end

#loggerObject

The application logger



110
111
112
# File 'lib/tap/app.rb', line 110

def logger
  @logger
end

#objectsObject (readonly)

A cache of application objects



107
108
109
# File 'lib/tap/app.rb', line 107

def objects
  @objects
end

#queueObject (readonly)

The application queue



104
105
106
# File 'lib/tap/app.rb', line 104

def queue
  @queue
end

#stackObject

The application call stack for executing tasks



101
102
103
# File 'lib/tap/app.rb', line 101

def stack
  @stack
end

#stateObject (readonly)

The state of the application (see App::State)



98
99
100
# File 'lib/tap/app.rb', line 98

def state
  @state
end

Class Method Details

.build(spec = {}, app = current) ⇒ Object



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/tap/app.rb', line 39

def build(spec={}, app=current)
  config  = spec['config'] || {}
  signals = spec['signals'] || []
  
  if spec['self']
    app.reconfigure(config)
  else
    app = new(config, :env => app.env)
  end
  
  signals.each do |args|
    app.call(args)
  end
  
  app.gc
  app
end

.contextObject



27
28
29
# File 'lib/tap/app.rb', line 27

def context
  Thread.current[CONTEXT] ||= {}
end

.currentObject



35
36
37
# File 'lib/tap/app.rb', line 35

def current
  context[CURRENT] ||= new
end

.current=(app) ⇒ Object



31
32
33
# File 'lib/tap/app.rb', line 31

def current=(app)
  context[CURRENT] = app
end

.set_context(context = {}) ⇒ Object



12
13
14
15
16
# File 'lib/tap/app.rb', line 12

def set_context(context={})
  current = Thread.current[CONTEXT]
  Thread.current[CONTEXT] = context
  current
end

.with_context(context) ⇒ Object



18
19
20
21
22
23
24
25
# File 'lib/tap/app.rb', line 18

def with_context(context)
  begin
    current = set_context(context)
    yield
  ensure
    set_context(current)
  end
end

Instance Method Details

#build(spec, &block) ⇒ Object



387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
# File 'lib/tap/app.rb', line 387

def build(spec, &block)
  var  = spec['var']
  clas = spec['class']
  spec = spec['spec'] || spec
  
  obj = nil
  unless clas.nil?
    method_name = spec.kind_of?(Array) ? :parse : :build
    obj = env.constant(clas).send(method_name, spec, self, &block)
  end
  
  unless var.nil?
    if var.respond_to?(:each)
      var.each {|v| set(v, obj) }
    else
      set(var, obj)
    end
  end
  
  obj
end

#call(args, &block) ⇒ Object

Sends a signal to an application object. The input should be a hash defining these fields:

obj      # a variable identifying an object, or nil for self
sig      # the signal name
args     # arguments to the signal (typically a Hash)

Call does the following:

object = app.get(obj)        # lookup an application object by obj
signal = object.signal(sig)  # lookup a signal by sig
signal.call(args)            # call the signal with args

Call returns the result of the signal call.



353
354
355
356
357
358
359
360
361
362
# File 'lib/tap/app.rb', line 353

def call(args, &block)
  log(:call, nil, Logger::DEBUG) { args.inspect } if debug
  
  obj  = args['obj']
  sig  = args['sig']
  args = args['args'] || args
  
  # nil obj routes back to app, so optimize by evaluating signal directly
  (obj.nil? ? signal(sig, &block) : route(obj, sig, &block)).call(args)
end

#check_terminateObject

Raises a TerminateError if state is TERMINATE. Nodes should call check_terminate to provide breakpoints in long-running processes.

A block may be provided to check_terminate to execute code before raising the TerminateError.



551
552
553
554
555
556
# File 'lib/tap/app.rb', line 551

def check_terminate
  if state == State::TERMINATE
    yield() if block_given?
    raise TerminateError.new
  end
end

#enq(task, input = []) ⇒ Object

Enques the task with the input. Returns the task.



268
269
270
271
# File 'lib/tap/app.rb', line 268

def enq(task, input=[])
  queue.enq(task, input)
  task
end

#exe(task, input = []) ⇒ Object

Executes tasks by doing the following.

  • call stack with the task and input

  • call the task joins (task.joins)

Returns the stack result.



468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
# File 'lib/tap/app.rb', line 468

def exe(task, input=[])
  log("#{var(task)} <<", "#{input.inspect} (#{task.class})", Logger::DEBUG) if debug
  output = stack.call(task, input)
  log("#{var(task)} >>", "#{output.inspect} (#{task.class})", Logger::DEBUG) if debug
  
  if task.respond_to?(:joins)
    if joins = task.joins
      joins.each do |join|
        join.call(output)
      end
    end
  end
  
  output
end

#gc(all = false) ⇒ Object

Removes objects keyed by integers. If all is specified, gc will clear all objects.



329
330
331
332
333
334
335
336
337
# File 'lib/tap/app.rb', line 329

def gc(all=false)
  if all
    objects.clear
  else
    objects.delete_if {|var, obj| var.kind_of?(Integer) }
  end
  
  self
end

#get(var) ⇒ Object

Returns the object set to var, or self if var is nil.



295
296
297
# File 'lib/tap/app.rb', line 295

def get(var)
  var.nil? ? self : objects[var]
end

#infoObject

Returns an information string for the App.

App.new.info   # => 'state: 0 (READY) queue: 0'


562
563
564
# File 'lib/tap/app.rb', line 562

def info
  "state: #{state} (#{State.state_str(state)}) queue: #{queue.size}"
end

#init(clas, *args, &block) ⇒ Object

Resolves the class in env and initializes a new instance with the args and block. Note that the app is not appended to args by default.



383
384
385
# File 'lib/tap/app.rb', line 383

def init(clas, *args, &block)
  env.constant(clas).new(*args, &block)
end

#inspectObject



676
677
678
# File 'lib/tap/app.rb', line 676

def inspect
  "#<#{self.class}:#{object_id} #{info}>"
end

#log(action = '', msg = nil, level = Logger::INFO) ⇒ Object

Logs the action and message at the input level (default INFO). The message may be generated by a block; in that case leave the message unspecified as nil.

Logging is suppressed if quiet is true.

Performance Considerations

Using a block to generate a message is quicker if logging is off, but slower when logging is on. However, when messages use a lot of interpolation the log time is dominated by the interpolation; at some point the penalty for using a block is outweighed by the benefit of being able to skip the interpolation.

For example:

log(:action, "this is fast")
log(:action) { "and there's not much benefit to the block" }

log(:action, "but a message with #{a}, #{b}, #{c}, and #{d}")
log(:action) { "may be #{best} in a block because you can #{turn} #{it} #{off}" }


260
261
262
263
264
265
# File 'lib/tap/app.rb', line 260

def log(action='', msg=nil, level=Logger::INFO)
  if !quiet || verbose
    msg = yield if msg.nil? && block_given?
    logger.add(level, msg.to_s, action.to_s)
  end
end

#middlewareObject

Returns an array of middlware in use by self.



418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
# File 'lib/tap/app.rb', line 418

def middleware
  middleware = []
  
  # collect middleware by walking up the stack
  synchronize do
    current = stack
    visited = [current]
    
    while current.respond_to?(:stack)
      middleware << current
      current = current.stack
      
      circular_stack = visited.include?(current)
      visited << current
      
      if circular_stack
        visited.collect! {|m| m.class.to_s }.join(', ')
        raise "circular stack detected:\n[#{visited}]"
      end
    end
  end
  
  middleware
end

#obj(var) ⇒ Object

Same as get, but raises an error if no object is set to the variable.



300
301
302
# File 'lib/tap/app.rb', line 300

def obj(var)
  get(var) or raise "no object set to: #{var.inspect}"
end

#pq(task, input = []) ⇒ Object

Priority-enques (unshifts) the task with the input. Returns the task.



274
275
276
277
# File 'lib/tap/app.rb', line 274

def pq(task, input=[])
  queue.unshift(task, input)
  task
end

#resetObject

Clears objects, the queue, and resets the stack so that no middleware is used. Reset raises an error unless state is READY.



445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
# File 'lib/tap/app.rb', line 445

def reset
  synchronize do
    unless state == State::READY
      raise "cannot reset unless READY"
    end
    
    # walk up middleware to find the base of the stack
    while @stack.respond_to?(:stack)
      @stack = @stack.stack
    end
    
    objects.clear
    queue.clear
  end
  self
end

#route(obj, sig, &block) ⇒ Object



369
370
371
372
373
374
375
376
377
378
379
# File 'lib/tap/app.rb', line 369

def route(obj, sig, &block)
  unless object = get(obj)
    raise "unknown object: #{obj.inspect}"
  end
  
  unless object.respond_to?(:signal)
    raise "cannot signal: #{object.inspect}"
  end
  
  object.signal(sig, &block)
end

#runObject

Sequentially executes each enqued job (a [task, input] pair). A run continues until the queue is empty.

Run checks the state of self before executing a task. If the state changes from RUN, the following behaviors result:

STOP        No more tasks will be executed; the current task
            will continute to completion.
TERMINATE   No more tasks will be executed and the currently
            running task will be discontinued as described in
            terminate.

Calls to run when the state is not READY do nothing and return immediately.

Returns self.



500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
# File 'lib/tap/app.rb', line 500

def run
  synchronize do
    return self unless state == State::READY
    @state = State::RUN
  end
  
  begin
    while state == State::RUN
      break unless job = queue.deq
      exe(*job)
    end
  rescue(TerminateError)
    # gracefully fail for termination errors
    queue.unshift(*job)
  ensure
    synchronize { @state = State::READY }
  end
  
  self
end

#scopeObject

Sets self as instance in the current context, for the duration of the block (see App.with_context).



568
569
570
# File 'lib/tap/app.rb', line 568

def scope
  App.with_context(CURRENT => self) { yield }
end

#serialize(bare = true) ⇒ Object

Converts the self to a schema that can be used to build a new app with equivalent application objects, queue, and middleware. Schema are a collection of signal hashes such that this will rebuild the state of a on b:

a, b = App.new, App.new
a.to_schema.each {|spec| b.call(spec) }

Application objects that do not satisfy the application object API are quietly ignored; enable debugging to be warned of their existance.



583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
# File 'lib/tap/app.rb', line 583

def serialize(bare=true)
  # setup variables
  specs = {}
  order = []
  
  # collect enque signals to setup queue
  signals = queue.to_a.collect do |(task, input)|
    {'sig' => 'enq', 'args' => {'var' => var(task), 'input' => input}}
  end
  
  # collect and trace application objects
  objects.keys.sort_by do |var|
    var.to_s
  end.each do |var|
    obj = objects[var]
    order.concat trace(obj, specs)
  end
  
  middleware.each do |obj|
    order.concat trace(obj, specs)
  end
  
  if bare
    order.delete(self)
    specs.delete(self)
  else
    order.unshift(self)
    trace(self, specs)
  end
  order.uniq!
  
  # assemble specs
  variables = {}
  objects.each_pair do |var, obj|
    (variables[obj] ||= []) << var
  end
  
  specs.keys.each do |obj|
    spec = {'sig' => 'set'}
    
    # assign variables
    if vars = variables[obj]
      if vars.length == 1
        spec['var'] = vars[0]
      else
        spec['var'] = vars
      end
    end

    # assign the class
    spec['class'] = obj.class.to_s

    # merge obj_spec if possible
    obj_spec = specs[obj]
    if (obj_spec.keys & RESERVED_KEYS).empty?
      spec.merge!(obj_spec)
    else
      spec['spec'] = obj_spec
    end
    
    specs[obj] = spec
  end
  
  middleware.each do |obj|
    spec = specs[obj]
    spec['sig'] = 'use'
  end
  
  order.collect! {|obj| specs[obj] }.concat(signals)
end

#set(var, obj) ⇒ Object

Sets the object to the specified variable and returns obj. Provide nil as obj to un-set a variable (in which case the existing object is returned).

Nil is reserved as a variable name and cannot be used by set.



284
285
286
287
288
289
290
291
292
# File 'lib/tap/app.rb', line 284

def set(var, obj)
  raise "no var specified" if var.nil?
  
  if obj
    objects[var] = obj
  else
    objects.delete(var)
  end
end

#signal(sig, &block) ⇒ Object



364
365
366
367
# File 'lib/tap/app.rb', line 364

def signal(sig, &block)
  sig = sig.to_s
  sig =~ OBJECT ? route($1, $2, &block) : super(sig, &block)
end

#stopObject

Signals a running app to stop executing tasks to the application stack by setting state to STOP. The task currently in the stack will continue to completion.

Does nothing unless state is RUN.



526
527
528
529
# File 'lib/tap/app.rb', line 526

def stop
  synchronize { @state = State::STOP if state == State::RUN }
  self
end

#terminateObject

Signals a running application to terminate execution by setting state to TERMINATE. In this state, calls to check_terminate will raise a TerminateError. Run considers TerminateErrors a normal exit and rescues them quietly.

Nodes can set breakpoints that call check_terminate to invoke task-specific termination. If a task never calls check_terminate, then it will continue to completion.

Does nothing if state is READY.



541
542
543
544
# File 'lib/tap/app.rb', line 541

def terminate
  synchronize { @state = State::TERMINATE unless state == State::READY }
  self
end

#to_specObject



654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
# File 'lib/tap/app.rb', line 654

def to_spec
  signals = serialize(false)
  spec = signals.shift
  
  spec.delete('self')
  spec.delete('sig')
  
  var = spec.delete('var')
  klass = spec.delete('class')
  spec = spec.delete('spec') || spec
  
  signals.unshift(
    'sig' => 'set',
    'var' => var, 
    'class' => klass, 
    'self' => true
  ) if var
  spec['signals'] = signals
  
  spec
end

#use(clas, *argv) ⇒ Object

Adds the specified middleware to the stack. The argv will be used as extra arguments to initialize the middleware.



411
412
413
414
415
# File 'lib/tap/app.rb', line 411

def use(clas, *argv)
  synchronize do
    @stack = init(clas, @stack, *argv)
  end
end

#var(obj, auto_assign = true) ⇒ Object

Returns the variable for the object. If the object is not assigned to a variable and auto_assign is true, then the object is set to an unused variable and the new variable is returned.

The new variable will be an integer and will be removed upon gc.



309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
# File 'lib/tap/app.rb', line 309

def var(obj, auto_assign=true)
  objects.each_pair do |var, object|
    return var if obj == object
  end
  
  return nil unless auto_assign
  
  var = objects.length
  loop do 
    if objects.has_key?(var)
      var += 1
    else
      set(var, obj)
      return var
    end
  end
end