Class: Ruote::Tracker
- Inherits:
-
Object
- Object
- Ruote::Tracker
- Defined in:
- lib/ruote/svc/tracker.rb
Overview
The tracker service is used by the “listen” expression. This services sees all the msg processed by a worker and triggers any listener interested in a particular msg.
Look at the ListenExpression for more details.
Instance Method Summary collapse
-
#add_tracker(wfid, action, id, conditions, msg) ⇒ Object
Adds a tracker (usually when a ‘listen’ expression gets applied).
-
#initialize(context) ⇒ Tracker
constructor
A new instance of Tracker.
-
#notify(message) ⇒ Object
The worker passes all the messages it has to process to the tracker via this method.
-
#remove_tracker(fei, doc = nil) ⇒ Object
Removes a tracker (usually when a ‘listen’ expression replies to its parent expression or is cancelled).
Constructor Details
#initialize(context) ⇒ Tracker
Returns a new instance of Tracker.
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/ruote/svc/tracker.rb', line 37 def initialize(context) @context = context if @context.worker # # this is a worker context, DO log # @context.worker.subscribe(:all, self) #else # # this is not a worker context, no notifications. BUT # honour calls to add_tracker/remove_tracker # end end |
Instance Method Details
#add_tracker(wfid, action, id, conditions, msg) ⇒ Object
Adds a tracker (usually when a ‘listen’ expression gets applied).
115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 |
# File 'lib/ruote/svc/tracker.rb', line 115 def add_tracker(wfid, action, id, conditions, msg) doc = @context.storage.get_trackers doc['trackers'][id] = { 'wfid' => wfid, 'action' => action, 'id' => id, 'conditions' => conditions, 'msg' => msg } r = @context.storage.put(doc) add_tracker(wfid, action, id, conditions, msg) if r # the put failed, have to redo the work end |
#notify(message) ⇒ Object
The worker passes all the messages it has to process to the tracker via this method.
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/ruote/svc/tracker.rb', line 57 def notify() m_error = ['error'] m_wfid = ['wfid'] || (['fei']['wfid'] rescue nil) m_action = ['action'] msg = m_action == 'error_intercepted' ? ['msg'] : @context.storage.get_trackers['trackers'].each do |tracker_id, tracker| # filter msgs t_wfid = tracker['wfid'] t_action = tracker['action'] next if t_wfid && t_wfid != m_wfid next if t_action && t_action != m_action next unless does_match?(, tracker['conditions']) if tracker_id == 'on_error' || tracker_id == 'on_terminate' fields = msg['workitem']['fields'] next if m_action == 'error_intercepted' && fields['__error__'] next if m_action == 'terminated' && (fields['__error__'] || fields['__terminate__']) end # prepare and emit/put 'reaction' message m = tracker['msg'] action = m.delete('action') m['wfid'] = m_wfid if m['wfid'] == 'replace' m['wfid'] ||= @context.wfidgen.generate m['workitem'] = msg['workitem'] if m['workitem'] == 'replace' if t_action == 'error_intercepted' m['workitem']['fields']['__error__'] = m_error elsif tracker_id == 'on_error' && m_action == 'error_intercepted' m['workitem']['fields']['__error__'] = m_error elsif tracker_id == 'on_terminate' && m_action == 'terminated' m['workitem']['fields']['__terminate__'] = { 'wfid' => m_wfid } end if m['variables'] == 'compile' fexp = Ruote::Exp::FlowExpression.fetch(@context, msg['fei']) m['variables'] = fexp ? fexp.compile_variables : {} end @context.storage.put_msg(action, m) end end |
#remove_tracker(fei, doc = nil) ⇒ Object
Removes a tracker (usually when a ‘listen’ expression replies to its parent expression or is cancelled).
135 136 137 138 139 140 141 142 143 144 145 |
# File 'lib/ruote/svc/tracker.rb', line 135 def remove_tracker(fei, doc=nil) doc ||= @context.storage.get_trackers doc['trackers'].delete(Ruote.to_storage_id(fei)) r = @context.storage.put(doc) remove_tracker(fei, r) if r # the put failed, have to redo the work end |