Class: Ruote::Tracker

Inherits:
Object
  • Object
show all
Defined in:
lib/ruote/svc/tracker.rb

Overview

The tracker service is used by the “listen” expression. This services sees all the msg processed by a worker and triggers any listener interested in a particular msg.

Look at the ListenExpression for more details.

Instance Method Summary collapse

Constructor Details

#initialize(context) ⇒ Tracker

Returns a new instance of Tracker.



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/ruote/svc/tracker.rb', line 37

def initialize(context)

  @context = context

  if @context.worker
    #
    # this is a worker context, DO log
    #
    @context.worker.subscribe(:all, self)
  #else
    #
    # this is not a worker context, no notifications. BUT
    # honour calls to add_tracker/remove_tracker
    #
  end
end

Instance Method Details

#add_tracker(wfid, action, id, conditions, msg) ⇒ Object

Adds a tracker (usually when a ‘listen’ expression gets applied).



115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# File 'lib/ruote/svc/tracker.rb', line 115

def add_tracker(wfid, action, id, conditions, msg)

  doc = @context.storage.get_trackers

  doc['trackers'][id] =
    { 'wfid' => wfid,
      'action' => action,
      'id' => id,
      'conditions' => conditions,
      'msg' => msg }

  r = @context.storage.put(doc)

  add_tracker(wfid, action, id, conditions, msg) if r
    # the put failed, have to redo the work
end

#notify(message) ⇒ Object

The worker passes all the messages it has to process to the tracker via this method.



57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/ruote/svc/tracker.rb', line 57

def notify(message)

  m_error = message['error']
  m_wfid = message['wfid'] || (message['fei']['wfid'] rescue nil)
  m_action = message['action']

  msg = m_action == 'error_intercepted' ? message['msg'] : message

  @context.storage.get_trackers['trackers'].each do |tracker_id, tracker|

    # filter msgs

    t_wfid = tracker['wfid']
    t_action = tracker['action']

    next if t_wfid && t_wfid != m_wfid
    next if t_action && t_action != m_action

    next unless does_match?(message, tracker['conditions'])

    if tracker_id == 'on_error' || tracker_id == 'on_terminate'

      fields = msg['workitem']['fields']

      next if m_action == 'error_intercepted' && fields['__error__']
      next if m_action == 'terminated' && (fields['__error__'] || fields['__terminate__'])
    end

    # prepare and emit/put 'reaction' message

    m = tracker['msg']

    action = m.delete('action')

    m['wfid'] = m_wfid if m['wfid'] == 'replace'
    m['wfid'] ||= @context.wfidgen.generate

    m['workitem'] = msg['workitem'] if m['workitem'] == 'replace'

    if t_action == 'error_intercepted'
      m['workitem']['fields']['__error__'] = m_error
    elsif tracker_id == 'on_error' && m_action == 'error_intercepted'
      m['workitem']['fields']['__error__'] = m_error
    elsif tracker_id == 'on_terminate' && m_action == 'terminated'
      m['workitem']['fields']['__terminate__'] = { 'wfid' => m_wfid }
    end

    if m['variables'] == 'compile'
      fexp = Ruote::Exp::FlowExpression.fetch(@context, msg['fei'])
      m['variables'] = fexp ? fexp.compile_variables : {}
    end

    @context.storage.put_msg(action, m)
  end
end

#remove_tracker(fei, doc = nil) ⇒ Object

Removes a tracker (usually when a ‘listen’ expression replies to its parent expression or is cancelled).



135
136
137
138
139
140
141
142
143
144
145
# File 'lib/ruote/svc/tracker.rb', line 135

def remove_tracker(fei, doc=nil)

  doc ||= @context.storage.get_trackers

  doc['trackers'].delete(Ruote.to_storage_id(fei))

  r = @context.storage.put(doc)

  remove_tracker(fei, r) if r
    # the put failed, have to redo the work
end