Module: Taskinator::Persistence::InstanceMethods

Defined in:
lib/taskinator/persistence.rb

Constant Summary collapse

EXPIRE_IN =

30 minutes

30 * 60

Instance Method Summary collapse

Instance Method Details

#cleanup(expire_in = EXPIRE_IN) ⇒ Object



216
217
218
219
220
221
222
223
224
225
226
# File 'lib/taskinator/persistence.rb', line 216

def cleanup(expire_in=EXPIRE_IN)
  Taskinator.redis do |conn|

    # use the "clean up" visitor
    RedisCleanupVisitor.new(conn, self, expire_in).visit

    # remove from the list
    conn.srem(Persistence.processes_list_key(scope), uuid)

  end
end

#deincr_pending_tasksObject



196
197
198
199
200
# File 'lib/taskinator/persistence.rb', line 196

def deincr_pending_tasks
  Taskinator.redis do |conn|
    conn.incrby("#{key}.pending", -1)
  end
end

#errorObject

retrieves the error type, message and backtrace and returns an array with 3 subscripts respectively



148
149
150
151
152
153
154
155
# File 'lib/taskinator/persistence.rb', line 148

def error
  @error ||= Taskinator.redis do |conn|
    error_type, error_message, error_backtrace =
      conn.hmget(self.key, :error_type, :error_message, :error_backtrace)

    [error_type, error_message, JSON.parse(error_backtrace || '[]')]
  end
end

#fail(error = nil) ⇒ Object

persists the error information



132
133
134
135
136
137
138
139
140
141
142
143
144
# File 'lib/taskinator/persistence.rb', line 132

def fail(error=nil)
  return unless error && error.is_a?(Exception)

  Taskinator.redis do |conn|
    conn.hmset(
      self.key,
      :error_type, error.class.name,
      :error_message, error.message,
      :error_backtrace, JSON.generate(error.backtrace || []),
      :updated_at, Time.now.utc
    )
  end
end

#keyObject

the persistence key



83
84
85
# File 'lib/taskinator/persistence.rb', line 83

def key
  @key ||= self.class.key_for(self.uuid)
end

#load_workflow_stateObject

retrieves the workflow state this method is called from the workflow gem



101
102
103
104
105
106
# File 'lib/taskinator/persistence.rb', line 101

def load_workflow_state
  state = Taskinator.redis do |conn|
    conn.hget(self.key, :state) || 'initial'
  end
  state.to_sym
end

#persist_workflow_state(new_state) ⇒ Object

persists the workflow state this method is called from the workflow gem



110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/taskinator/persistence.rb', line 110

def persist_workflow_state(new_state)
  @updated_at = Time.now.utc
  Taskinator.redis do |conn|
    process_key = self.process_key
    conn.multi do |transaction|
      transaction.hmset(
        self.key,
        :state, new_state,
        :updated_at, @updated_at
      )

      # also update the "root" process
      transaction.hset(
        process_key,
        :updated_at, @updated_at
      )
    end
  end
  new_state
end

#process_keyObject

the root process persistence key associated with this process or task



95
96
97
# File 'lib/taskinator/persistence.rb', line 95

def process_key
  @process_key ||= Taskinator::Process.key_for(process_uuid)
end

#process_optionsObject

retrieves the process options of the root process this is so that meta data of the process can be maintained and accessible to instrumentation subscribers



205
206
207
208
209
210
211
212
# File 'lib/taskinator/persistence.rb', line 205

def process_options
  @process_options ||= begin
    yaml = Taskinator.redis do |conn|
      conn.hget(self.process_key, :options)
    end
    yaml ? Taskinator::Persistence.deserialize(yaml) : {}
  end
end

#process_uuidObject

the root process uuid associated with this process or task



88
89
90
91
92
# File 'lib/taskinator/persistence.rb', line 88

def process_uuid
  @process_uuid ||= Taskinator.redis do |conn|
    conn.hget(self.key, :process_uuid)
  end
end

#saveObject



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/taskinator/persistence.rb', line 56

def save
  Taskinator.redis do |conn|
    conn.pipelined do |pipeline|
      visitor = RedisSerializationVisitor.new(pipeline, self).visit
      pipeline.hmset(
        Taskinator::Process.key_for(uuid),
        :tasks_count,      visitor.task_count,
        :tasks_failed,     0,
        :tasks_processing, 0,
        :tasks_completed,  0,
        :tasks_cancelled,  0,
      )
      true
    end
  end
end

#tasks_countObject



157
158
159
160
161
162
163
164
# File 'lib/taskinator/persistence.rb', line 157

def tasks_count
  @tasks_count ||= begin
    count = Taskinator.redis do |conn|
      conn.hget self.process_key, :tasks_count
    end
    count.to_i
  end
end

#to_xmlObject



73
74
75
76
77
78
79
80
# File 'lib/taskinator/persistence.rb', line 73

def to_xml
  builder = ::Builder::XmlMarkup.new
  builder.instruct!
  builder.tag!('process', :key => self.key) do |xml|
    XmlSerializationVisitor.new(xml, self).visit
  end
  builder
end