Module: Taskinator::Definition

Defined in:
lib/taskinator/definition.rb,
lib/taskinator/definition/builder.rb

Defined Under Namespace

Classes: Builder, ProcessAlreadyDefinedError, ProcessUndefinedError

Constant Summary collapse

UndefinedProcessError =

for backward compatibility

ProcessUndefinedError

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#queueObject

Returns the value of attribute queue.



85
86
87
# File 'lib/taskinator/definition.rb', line 85

def queue
  @queue
end

Instance Method Details

#create_process(*args) ⇒ Object

creates an instance of the process NOTE: the supplied @args are serialized and ultimately passed to each method of the defined process



91
92
93
94
# File 'lib/taskinator/definition.rb', line 91

def create_process(*args)
  assert_valid_process_module
  _create_process_(false, *args)
end

#create_process_remotely(*args) ⇒ Object

returns the process uuid of the process to be created the process can be retrieved using this uuid by using Taskinator::Process.fetch(uuid)



101
102
103
104
105
106
107
108
# File 'lib/taskinator/definition.rb', line 101

def create_process_remotely(*args)
  assert_valid_process_module
  uuid = Taskinator.generate_uuid

  Taskinator.queue.enqueue_create_process(self, uuid, args)

  return uuid
end

#create_sub_process(*args) ⇒ Object



110
111
112
113
# File 'lib/taskinator/definition.rb', line 110

def create_sub_process(*args)
  assert_valid_process_module
  _create_process_(true, *args)
end

#define_concurrent_process(*arg_list, &block) ⇒ Object



20
21
22
23
24
25
26
# File 'lib/taskinator/definition.rb', line 20

def define_concurrent_process(*arg_list, &block)
  factory = lambda {|definition, options|
    complete_on = options.delete(:complete_on) || CompleteOn::Default
    Process.define_concurrent_process_for(definition, complete_on, options)
  }
  define_process(*arg_list + [factory], &block)
end

#define_process(*arg_list, &block) ⇒ Object



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/taskinator/definition.rb', line 28

def define_process(*arg_list, &block)
  raise ProcessAlreadyDefinedError if respond_to?(:_create_process_)

  factory = arg_list.last.respond_to?(:call) ?
              arg_list.pop :
              lambda {|definition, options|
                Process.define_sequential_process_for(definition, options)
              }

  # called from respective "create_process" methods
  # parameters can contain options as the last parameter
  define_singleton_method :_create_process_ do |subprocess, *args|
    begin

      # TODO: better validation of arguments

      # FIXME: arg_list should only contain an array of symbols

      raise ArgumentError, "wrong number of arguments (#{args.length} for #{arg_list.length})" if args.length < arg_list.length

      options = (args.last.is_a?(Hash) ? args.last : {})
      options[:scope] ||= :shared

      process = factory.call(self, options)

      # this may take long... up to users definition
      Taskinator.instrumenter.instrument('taskinator.process.created', :uuid => process.uuid, :state => :initial) do
        Builder.new(process, self, *args).instance_eval(&block)
      end

      # only save "root processes"
      unless subprocess

        # instrument separately
        Taskinator.instrumenter.instrument('taskinator.process.saved', :uuid => process.uuid, :state => :initial) do

          # this will visit "sub processes" and persist them too
          process.save

          # add it to the list of "root processes"
          Persistence.add_process_to_list(process)

        end

      end

      # this is the "root" process
      process

    rescue => e
      Taskinator.logger.error(e)
      Taskinator.logger.debug(e.backtrace)
      raise e
    end
  end
end

#define_sequential_process(*arg_list, &block) ⇒ Object

defines a process



13
14
15
16
17
18
# File 'lib/taskinator/definition.rb', line 13

def define_sequential_process(*arg_list, &block)
  factory = lambda {|definition, options|
    Process.define_sequential_process_for(definition, options)
  }
  define_process(*arg_list + [factory], &block)
end