Module: Taskinator::Persistence::InstanceMethods
- Defined in:
- lib/taskinator/persistence.rb
Constant Summary collapse
- EXPIRE_IN =
30 minutes
30 * 60
Instance Method Summary collapse
- #cleanup(expire_in = EXPIRE_IN) ⇒ Object
- #deincr_pending_tasks ⇒ Object
-
#error ⇒ Object
retrieves the error type, message and backtrace and returns an array with 3 subscripts respectively.
-
#fail(error = nil) ⇒ Object
persists the error information.
-
#key ⇒ Object
the persistence key.
-
#load_workflow_state ⇒ Object
retrieves the workflow state this method is called from the workflow gem.
-
#persist_workflow_state(new_state) ⇒ Object
persists the workflow state this method is called from the workflow gem.
-
#process_key ⇒ Object
the root process persistence key associated with this process or task.
-
#process_options ⇒ Object
retrieves the process options of the root process this is so that meta data of the process can be maintained and accessible to instrumentation subscribers.
-
#process_uuid ⇒ Object
the root process uuid associated with this process or task.
- #save ⇒ Object
- #tasks_count ⇒ Object
- #to_xml ⇒ Object
Instance Method Details
#cleanup(expire_in = EXPIRE_IN) ⇒ Object
214 215 216 217 218 219 220 221 222 223 224 |
# File 'lib/taskinator/persistence.rb', line 214 def cleanup(expire_in=EXPIRE_IN) Taskinator.redis do |conn| # use the "clean up" visitor RedisCleanupVisitor.new(conn, self, expire_in).visit # remove from the list conn.srem(Persistence.processes_list_key(scope), uuid) end end |
#deincr_pending_tasks ⇒ Object
194 195 196 197 198 |
# File 'lib/taskinator/persistence.rb', line 194 def deincr_pending_tasks Taskinator.redis do |conn| conn.incrby("#{key}.pending", -1) end end |
#error ⇒ Object
retrieves the error type, message and backtrace and returns an array with 3 subscripts respectively
146 147 148 149 150 151 152 153 |
# File 'lib/taskinator/persistence.rb', line 146 def error @error ||= Taskinator.redis do |conn| error_type, , error_backtrace = conn.hmget(self.key, :error_type, :error_message, :error_backtrace) [error_type, , JSON.parse(error_backtrace || '[]')] end end |
#fail(error = nil) ⇒ Object
persists the error information
130 131 132 133 134 135 136 137 138 139 140 141 142 |
# File 'lib/taskinator/persistence.rb', line 130 def fail(error=nil) return unless error && error.is_a?(Exception) Taskinator.redis do |conn| conn.hmset( self.key, :error_type, error.class.name, :error_message, error., :error_backtrace, JSON.generate(error.backtrace || []), :updated_at, Time.now.utc ) end end |
#key ⇒ Object
the persistence key
81 82 83 |
# File 'lib/taskinator/persistence.rb', line 81 def key @key ||= self.class.key_for(self.uuid) end |
#load_workflow_state ⇒ Object
retrieves the workflow state this method is called from the workflow gem
99 100 101 102 103 104 |
# File 'lib/taskinator/persistence.rb', line 99 def load_workflow_state state = Taskinator.redis do |conn| conn.hget(self.key, :state) || 'initial' end state.to_sym end |
#persist_workflow_state(new_state) ⇒ Object
persists the workflow state this method is called from the workflow gem
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 |
# File 'lib/taskinator/persistence.rb', line 108 def persist_workflow_state(new_state) @updated_at = Time.now.utc Taskinator.redis do |conn| process_key = self.process_key conn.multi do conn.hmset( self.key, :state, new_state, :updated_at, @updated_at ) # also update the "root" process conn.hset( process_key, :updated_at, @updated_at ) end end new_state end |
#process_key ⇒ Object
the root process persistence key associated with this process or task
93 94 95 |
# File 'lib/taskinator/persistence.rb', line 93 def process_key @process_key ||= Taskinator::Process.key_for(process_uuid) end |
#process_options ⇒ Object
retrieves the process options of the root process this is so that meta data of the process can be maintained and accessible to instrumentation subscribers
203 204 205 206 207 208 209 210 |
# File 'lib/taskinator/persistence.rb', line 203 def @process_options ||= begin yaml = Taskinator.redis do |conn| conn.hget(self.process_key, :options) end yaml ? Taskinator::Persistence.deserialize(yaml) : {} end end |
#process_uuid ⇒ Object
the root process uuid associated with this process or task
86 87 88 89 90 |
# File 'lib/taskinator/persistence.rb', line 86 def process_uuid @process_uuid ||= Taskinator.redis do |conn| conn.hget(self.key, :process_uuid) end end |
#save ⇒ Object
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/taskinator/persistence.rb', line 54 def save Taskinator.redis do |conn| conn.pipelined do visitor = RedisSerializationVisitor.new(conn, self).visit conn.hmset( Taskinator::Process.key_for(uuid), :tasks_count, visitor.task_count, :tasks_failed, 0, :tasks_processing, 0, :tasks_completed, 0, :tasks_cancelled, 0, ) true end end end |
#tasks_count ⇒ Object
155 156 157 158 159 160 161 162 |
# File 'lib/taskinator/persistence.rb', line 155 def tasks_count @tasks_count ||= begin count = Taskinator.redis do |conn| conn.hget self.process_key, :tasks_count end count.to_i end end |
#to_xml ⇒ Object
71 72 73 74 75 76 77 78 |
# File 'lib/taskinator/persistence.rb', line 71 def to_xml builder = ::Builder::XmlMarkup.new builder.instruct! builder.tag!('process', :key => self.key) do |xml| XmlSerializationVisitor.new(xml, self).visit end builder end |