Class: Ruote::StorageHistory

Inherits:
Object
  • Object
show all
Defined in:
lib/ruote/log/storage_history.rb

Overview

Logs the ruote engine history to the storage underlying the worker.

Warning : don’t use this history implementation when the storage is HashStorage. It will fill up your memory… Keeping history for a transient ruote is a bit overkill (IMHO).

using the StorageHistory

engine.add_service(
  'history', 'ruote/log/storage_history', 'Ruote::StorageHistory')

# ...

process_history = engine.history.by_wfid(wfid0)

final note

By default, the history is an in-memory history (see Ruote::DefaultHistory) (and it is worthless when there are multiple workers).

Constant Summary collapse

DATE_REGEX =
/!(\d{4}-\d{2}-\d{2})!/

Instance Method Summary collapse

Constructor Details

#initialize(context, options = {}) ⇒ StorageHistory

Returns a new instance of StorageHistory.



54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/ruote/log/storage_history.rb', line 54

def initialize(context, options={})

  @context = context
  @options = options

  if @context.worker

    # only care about logging if there is a worker present

    @context.storage.add_type('history')
    @context.worker.subscribe(:all, self)
  end
end

Instance Method Details

#by_date(date) ⇒ Object

Returns all the history events for a given day.

Takes as argument whatever is a datetime when turned to a string and parsed.



111
112
113
114
115
116
# File 'lib/ruote/log/storage_history.rb', line 111

def by_date(date)

  date = Time.parse(date.to_s).strftime('%Y-%m-%d')

  @context.storage.get_many('history', /!#{date}!/)
end

#by_process(wfid) ⇒ Object Also known as: by_wfid

Returns all the msgs for a given wfid (process instance id).



83
84
85
86
# File 'lib/ruote/log/storage_history.rb', line 83

def by_process(wfid)

  @context.storage.get_many('history', wfid)
end

#clear!Object

The history system doesn’t implement purge! so that when purge! is called on the engine, the history is not cleared.

Call this dangerous clear! method to clean out any history file.



127
128
129
130
# File 'lib/ruote/log/storage_history.rb', line 127

def clear!

  @context.storage.purge_type!('history')
end

#notify(msg) ⇒ Object

This is the method called by the workqueue. Incoming engine events are ‘processed’ here.



135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
# File 'lib/ruote/log/storage_history.rb', line 135

def notify(msg)

  msg = msg.dup
    # a shallow copy is sufficient

  si = if fei = msg['fei']
    Ruote::FlowExpressionId.to_storage_id(fei)
  else
    msg['wfid'] || 'no_wfid'
  end

  _id = msg['_id']
  msg['original_id'] = _id
  msg['_id'] = "#{_id}!#{si}"

  msg['type'] = 'history'
  msg['original_put_at'] = msg['put_at']

  msg.delete('_rev')

  @context.storage.put(msg)
end

#rangeObject

Returns an array [ most recent date, oldest date ] (Time instances).



91
92
93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/ruote/log/storage_history.rb', line 91

def range

  ids = @context.storage.ids('history')

  #p ids.sort == ids

  fm = DATE_REGEX.match(ids.first)[1]
  lm = DATE_REGEX.match(ids.last)[1]

  first = Time.parse("#{fm} 00:00:00 UTC")
  last = Time.parse("#{lm} 00:00:00 UTC") + 24 * 3600

  [ first, last ]
end

#wfidsObject

Returns all the wfids for which there are history items (msgs) stored.



70
71
72
73
74
75
76
77
78
79
# File 'lib/ruote/log/storage_history.rb', line 70

def wfids

  wfids = @context.storage.ids('history').collect { |id|
    id.split('!').last
  }.uniq.sort

  wfids.delete('no_wfid')

  wfids
end