Module: Ruote::StorageBase
- Included in:
- CompositeStorage, FsStorage, HashStorage
- Defined in:
- lib/ruote/storage/base.rb
Overview
Base methods for storage implementations.
Instance Method Summary collapse
-
#clear ⇒ Object
Used when doing integration tests, removes all msgs, schedules, errors, expressions and workitems.
- #context ⇒ Object
- #context=(c) ⇒ Object
-
#copy_to(target, opts = {}) ⇒ Object
Copies the content of this storage into a target storage.
- #delete_schedule(schedule_id) ⇒ Object
- #empty?(type) ⇒ Boolean
-
#expression_wfids(opts) ⇒ Object
Given all the expressions stored here, returns a sorted list of unique wfids (this is used in Engine#processes(opts).
-
#find_root_expression(wfid) ⇒ Object
– expressions ++.
-
#get_configuration(key) ⇒ Object
– configurations ++.
-
#get_engine_variable(k) ⇒ Object
– engine variables ++.
-
#get_msgs ⇒ Object
– def get_local_msgs p @local_msgs if @local_msgs r = @local_msgs @local_msgs = nil r else [] end end ++.
-
#get_schedules(delta, now) ⇒ Object
– ats and crons ++.
-
#get_trackers ⇒ Object
– trackers ++.
- #put_engine_variable(k, v) ⇒ Object
-
#put_msg(action, options) ⇒ Object
– messages ++.
-
#put_schedule(flavour, owner_fei, s, msg) ⇒ Object
Places schedule in storage.
- #replace_engine_configuration(opts) ⇒ Object
-
#reserve(doc) ⇒ Object
Attempts to delete a document, returns true if the deletion succeeded.
Instance Method Details
#clear ⇒ Object
Used when doing integration tests, removes all msgs, schedules, errors, expressions and workitems.
NOTE that it doesn’t remove engine variables (danger)
260 261 262 263 264 265 |
# File 'lib/ruote/storage/base.rb', line 260 def clear %w[ msgs schedules errors expressions workitems ].each do |type| purge_type!(type) end end |
#context ⇒ Object
35 36 37 38 |
# File 'lib/ruote/storage/base.rb', line 35 def context @context ||= Ruote::Context.new(self) end |
#context=(c) ⇒ Object
40 41 42 43 |
# File 'lib/ruote/storage/base.rb', line 40 def context=(c) @context = c end |
#copy_to(target, opts = {}) ⇒ Object
Copies the content of this storage into a target storage.
Of course, the target storage may be a different implementation.
232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 |
# File 'lib/ruote/storage/base.rb', line 232 def copy_to(target, opts={}) counter = 0 %w[ configurations errors expressions msgs schedules variables workitems ].each do |type| ids(type).each do |id| item = get(type, id) item.delete('_rev') target.put(item) counter += 1 puts(" #{type}/#{item['_id']}") if opts[:verbose] end end counter end |
#delete_schedule(schedule_id) ⇒ Object
199 200 201 202 203 204 205 |
# File 'lib/ruote/storage/base.rb', line 199 def delete_schedule(schedule_id) return if schedule_id.nil? s = get('schedules', schedule_id) delete(s) if s end |
#empty?(type) ⇒ Boolean
110 111 112 113 |
# File 'lib/ruote/storage/base.rb', line 110 def empty?(type) (get_many(type, nil, :count => true) == 0) end |
#expression_wfids(opts) ⇒ Object
Given all the expressions stored here, returns a sorted list of unique wfids (this is used in Engine#processes(opts).
Understands the :skip, :limit and :descending options.
This is a base implementation, different storage implementations may come up with different implementations (think CouchDB, which could provide a view for it).
137 138 139 140 141 142 143 144 145 146 147 |
# File 'lib/ruote/storage/base.rb', line 137 def expression_wfids(opts) wfids = ids('expressions').collect { |fei| fei.split('!').last }.uniq.sort wfids = wfids.reverse if opts[:descending] skip = opts[:skip] || 0 limit = opts[:limit] || wfids.length wfids[skip, limit] end |
#find_root_expression(wfid) ⇒ Object
– expressions ++
119 120 121 122 123 124 125 126 |
# File 'lib/ruote/storage/base.rb', line 119 def find_root_expression(wfid) get_many('expressions', wfid).sort_by { |fexp| fexp['fei']['expid'] }.select { |e| e['parent_id'].nil? }.first end |
#get_configuration(key) ⇒ Object
– configurations ++
57 58 59 60 |
# File 'lib/ruote/storage/base.rb', line 57 def get_configuration(key) get('configurations', key) end |
#get_engine_variable(k) ⇒ Object
– engine variables ++
211 212 213 214 |
# File 'lib/ruote/storage/base.rb', line 211 def get_engine_variable(k) get_engine_variables['variables'][k] end |
#get_msgs ⇒ Object
– def get_local_msgs
p @local_msgs
if @local_msgs
r = @local_msgs
@local_msgs = nil
r
else
[]
end
end ++
101 102 103 104 105 106 107 108 |
# File 'lib/ruote/storage/base.rb', line 101 def get_msgs get_many( 'msgs', nil, :limit => 300 ).sort { |a, b| a['put_at'] <=> b['put_at'] } end |
#get_schedules(delta, now) ⇒ Object
– ats and crons ++
163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 |
# File 'lib/ruote/storage/base.rb', line 163 def get_schedules(delta, now) # TODO : bring that 'optimization' back in, # maybe every minute, if min != last_min ... #if delta < 1.0 # at = now.strftime('%Y%m%d%H%M%S') # get_many('schedules', /-#{at}$/) #elsif delta < 60.0 # at = now.strftime('%Y%m%d%H%M') # scheds = get_many('schedules', /-#{at}\d\d$/) # filter_schedules(scheds, now) #else # load all the schedules scheds = get_many('schedules') filter_schedules(scheds, now) #end end |
#get_trackers ⇒ Object
– trackers ++
153 154 155 156 157 |
# File 'lib/ruote/storage/base.rb', line 153 def get_trackers get('variables', 'trackers') || { '_id' => 'trackers', 'type' => 'variables', 'trackers' => {} } end |
#put_engine_variable(k, v) ⇒ Object
216 217 218 219 220 221 222 |
# File 'lib/ruote/storage/base.rb', line 216 def put_engine_variable(k, v) vars = get_engine_variables vars['variables'][k] = v put_engine_variable(k, v) unless put(vars).nil? end |
#put_msg(action, options) ⇒ Object
– messages ++
78 79 80 81 82 83 84 85 86 |
# File 'lib/ruote/storage/base.rb', line 78 def put_msg(action, ) msg = prepare_msg_doc(action, ) put(msg) #put(msg, :update_rev => true) #(@local_msgs ||= []) << Ruote.fulldup(msg) end |
#put_schedule(flavour, owner_fei, s, msg) ⇒ Object
Places schedule in storage. Returns the id of the ‘schedule’ document. If the schedule got triggered immediately, nil is returned.
186 187 188 189 190 191 192 193 194 195 196 197 |
# File 'lib/ruote/storage/base.rb', line 186 def put_schedule(flavour, owner_fei, s, msg) doc = prepare_schedule_doc(flavour, owner_fei, s, msg) return nil unless doc r = put(doc) raise "put_schedule failed" if r != nil doc['_id'] end |
#replace_engine_configuration(opts) ⇒ Object
62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/ruote/storage/base.rb', line 62 def replace_engine_configuration(opts) return if opts['preserve_configuration'] conf = get('configurations', 'engine') doc = opts.merge('type' => 'configurations', '_id' => 'engine') doc['_rev'] = conf['_rev'] if conf put(doc) end |
#reserve(doc) ⇒ Object
Attempts to delete a document, returns true if the deletion succeeded. This is used with msgs to reserve work on them.
48 49 50 51 |
# File 'lib/ruote/storage/base.rb', line 48 def reserve(doc) delete(doc).nil? end |