Module: Taskinator::Persistence::InstanceMethods
- Defined in:
- lib/taskinator/persistence.rb
Constant Summary collapse
- EXPIRE_IN =
30 minutes
30 * 60
Instance Method Summary collapse
- #cleanup(expire_in = EXPIRE_IN) ⇒ Object
- #deincr_pending_tasks ⇒ Object
-
#error ⇒ Object
retrieves the error type, message and backtrace and returns an array with 3 subscripts respectively.
-
#fail(error = nil) ⇒ Object
persists the error information.
-
#key ⇒ Object
the persistence key.
-
#load_workflow_state ⇒ Object
retrieves the workflow state this method is called from the workflow gem.
-
#persist_workflow_state(new_state) ⇒ Object
persists the workflow state this method is called from the workflow gem.
-
#process_key ⇒ Object
the root process persistence key associated with this process or task.
-
#process_options ⇒ Object
retrieves the process options of the root process this is so that meta data of the process can be maintained and accessible to instrumentation subscribers.
-
#process_uuid ⇒ Object
the root process uuid associated with this process or task.
- #save ⇒ Object
- #tasks_count ⇒ Object
- #to_xml ⇒ Object
Instance Method Details
#cleanup(expire_in = EXPIRE_IN) ⇒ Object
216 217 218 219 220 221 222 223 224 225 226 |
# File 'lib/taskinator/persistence.rb', line 216 def cleanup(expire_in=EXPIRE_IN) Taskinator.redis do |conn| # use the "clean up" visitor RedisCleanupVisitor.new(conn, self, expire_in).visit # remove from the list conn.srem(Persistence.processes_list_key(scope), uuid) end end |
#deincr_pending_tasks ⇒ Object
196 197 198 199 200 |
# File 'lib/taskinator/persistence.rb', line 196 def deincr_pending_tasks Taskinator.redis do |conn| conn.incrby("#{key}.pending", -1) end end |
#error ⇒ Object
retrieves the error type, message and backtrace and returns an array with 3 subscripts respectively
148 149 150 151 152 153 154 155 |
# File 'lib/taskinator/persistence.rb', line 148 def error @error ||= Taskinator.redis do |conn| error_type, , error_backtrace = conn.hmget(self.key, :error_type, :error_message, :error_backtrace) [error_type, , JSON.parse(error_backtrace || '[]')] end end |
#fail(error = nil) ⇒ Object
persists the error information
132 133 134 135 136 137 138 139 140 141 142 143 144 |
# File 'lib/taskinator/persistence.rb', line 132 def fail(error=nil) return unless error && error.is_a?(Exception) Taskinator.redis do |conn| conn.hmset( self.key, :error_type, error.class.name, :error_message, error., :error_backtrace, JSON.generate(error.backtrace || []), :updated_at, Time.now.utc ) end end |
#key ⇒ Object
the persistence key
83 84 85 |
# File 'lib/taskinator/persistence.rb', line 83 def key @key ||= self.class.key_for(self.uuid) end |
#load_workflow_state ⇒ Object
retrieves the workflow state this method is called from the workflow gem
101 102 103 104 105 106 |
# File 'lib/taskinator/persistence.rb', line 101 def load_workflow_state state = Taskinator.redis do |conn| conn.hget(self.key, :state) || 'initial' end state.to_sym end |
#persist_workflow_state(new_state) ⇒ Object
persists the workflow state this method is called from the workflow gem
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib/taskinator/persistence.rb', line 110 def persist_workflow_state(new_state) @updated_at = Time.now.utc Taskinator.redis do |conn| process_key = self.process_key conn.multi do |transaction| transaction.hmset( self.key, :state, new_state, :updated_at, @updated_at ) # also update the "root" process transaction.hset( process_key, :updated_at, @updated_at ) end end new_state end |
#process_key ⇒ Object
the root process persistence key associated with this process or task
95 96 97 |
# File 'lib/taskinator/persistence.rb', line 95 def process_key @process_key ||= Taskinator::Process.key_for(process_uuid) end |
#process_options ⇒ Object
retrieves the process options of the root process this is so that meta data of the process can be maintained and accessible to instrumentation subscribers
205 206 207 208 209 210 211 212 |
# File 'lib/taskinator/persistence.rb', line 205 def @process_options ||= begin yaml = Taskinator.redis do |conn| conn.hget(self.process_key, :options) end yaml ? Taskinator::Persistence.deserialize(yaml) : {} end end |
#process_uuid ⇒ Object
the root process uuid associated with this process or task
88 89 90 91 92 |
# File 'lib/taskinator/persistence.rb', line 88 def process_uuid @process_uuid ||= Taskinator.redis do |conn| conn.hget(self.key, :process_uuid) end end |
#save ⇒ Object
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/taskinator/persistence.rb', line 56 def save Taskinator.redis do |conn| conn.pipelined do |pipeline| visitor = RedisSerializationVisitor.new(pipeline, self).visit pipeline.hmset( Taskinator::Process.key_for(uuid), :tasks_count, visitor.task_count, :tasks_failed, 0, :tasks_processing, 0, :tasks_completed, 0, :tasks_cancelled, 0, ) true end end end |
#tasks_count ⇒ Object
157 158 159 160 161 162 163 164 |
# File 'lib/taskinator/persistence.rb', line 157 def tasks_count @tasks_count ||= begin count = Taskinator.redis do |conn| conn.hget self.process_key, :tasks_count end count.to_i end end |
#to_xml ⇒ Object
73 74 75 76 77 78 79 80 |
# File 'lib/taskinator/persistence.rb', line 73 def to_xml builder = ::Builder::XmlMarkup.new builder.instruct! builder.tag!('process', :key => self.key) do |xml| XmlSerializationVisitor.new(xml, self).visit end builder end |