Class: Ap4r::StoredMessage
- Defined in:
- lib/ap4r/stored_message.rb
Overview
This class is the model class for SAF(store and foward). The migration file is located at following path,
ap4r/lib/ap4r/xxx_create_table_for_saf.rb
Don’t forget to create table, before use SAF.
Constant Summary collapse
- STATUS_STORED =
0
- STATUS_FORWARDED =
1
- PHYSICAL =
:physical
- LOGICAL =
:logical
- @@status_value_of =
{ :unforwarded => STATUS_STORED, :forwarded => STATUS_FORWARDED }
Class Method Summary collapse
-
.destroy_if_exists(id, options) ⇒ Object
Destroy a record by id.
-
.find_status_of(status = :unforwarded) ⇒ Object
List the records which have specified status.
- .postgresql? ⇒ Boolean
-
.reforward(id) ⇒ Object
Try to forward the ONE message which status is unforwarded.
-
.reforward_all(transaction_num = 10) ⇒ Object
Try to forward all messages which status are unforwarded.
-
.store(queue_name, queue_message, rm_options = {}) ⇒ Object
Insert queue information, such as queue name and message, for next logic.
-
.update_status(id, status) ⇒ Object
Update status value.
Instance Method Summary collapse
- #destroy_or_update(options = {:delete_mode => PHYSICAL}) ⇒ Object
- #dumped_headers ⇒ Object
- #dumped_object ⇒ Object
- #forward_and_update_status ⇒ Object
-
#to_summary_string ⇒ Object
Return id, queue_name and created date time.
Class Method Details
.destroy_if_exists(id, options) ⇒ Object
Destroy a record by id. Some options are supported.
-
:delete_mode (:physical or :logical)
Default delete mmode is physical. If you need logical delete, for example you neeed checking message duplication etc, set the Ap4r::AsyncController.saf_delete_mode
:logical
.
90 91 92 93 94 95 96 97 98 99 |
# File 'lib/ap4r/stored_message.rb', line 90 def self.destroy_if_exists(id, ) result = nil begin result = StoredMessage.find(id) rescue ActiveRecord::RecordNotFound # There are possibilities that other threads or processes have already forwarded. return nil end result.destroy_or_update() end |
.find_status_of(status = :unforwarded) ⇒ Object
List the records which have specified status. The statuses are :forwarded, :unforwarded and :all. :unforwarded means unprocessed or error during forward process.
122 123 124 125 126 127 128 129 130 131 132 |
# File 'lib/ap4r/stored_message.rb', line 122 def self.find_status_of(status = :unforwarded) case status when :all StoredMessage.find(:all) when :forwarded, :unforwarded StoredMessage.find(:all, :conditions => { :status => @@status_value_of[status] }) else puts "Undefined status: #{status.to_s}." puts "Usage: Ap4r::StoredMessage.find_on [ :forwarded | :unforwarded | :all ]" end end |
.postgresql? ⇒ Boolean
101 102 103 |
# File 'lib/ap4r/stored_message.rb', line 101 def self.postgresql? "PostgreSQL" == Ap4r::StoredMessage.connection.adapter_name end |
.reforward(id) ⇒ Object
Try to forward the ONE message which status is unforwarded. If the message is forwarded successfully, the status will be “1” that means forwarded.
153 154 155 156 157 158 159 |
# File 'lib/ap4r/stored_message.rb', line 153 def self.reforward(id) = StoredMessage.find(id) if .status == @@status_value_of[:forwarded] raise "The message (id = #{id}) was already forwarded." end .forward_and_update_status end |
.reforward_all(transaction_num = 10) ⇒ Object
Try to forward all messages which status are unforwarded. This method issue commit command to database every transaction_num.
163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 |
# File 'lib/ap4r/stored_message.rb', line 163 def self.reforward_all(transaction_num = 10) = StoredMessage.find(:all, :conditions => {:status => @@status_value_of[:unforwarded]}) total_num = .size failed_num = 0 0.step(total_num, transaction_num) do |offset| target_sms = [offset..(offset + transaction_num - 1)] next if target_sms.empty? begin StoredMessage.transaction do target_sms.each do |target_sm| target_sm.forward_and_update_status end end rescue Exception => error puts error. failed_num += target_sms.size end end return [total_num - failed_num, failed_num] end |
.store(queue_name, queue_message, rm_options = {}) ⇒ Object
Insert queue information, such as queue name and message, for next logic.
duplication_check_id is generated from UUID and should be unique in all records of StoreMessages. So, using this id, it’s possible to protect to execute same asynchronous processing by same message. But by default, record of StoreMessages is removed after putting a message into queue completed.
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/ap4r/stored_message.rb', line 59 def self.store(queue_name, , = {}) sm = StoredMessage.new do |s| s.duplication_check_id = UUID.new s.queue = queue_name s.status = STATUS_STORED # The warning occurs when putting backslash into binaly type in PostgreSQL. if postgresql? s.object = YAML.dump() s.headers = YAML.dump() else s.object = Marshal::dump() s.headers = Marshal::dump() end end begin sm.save! rescue Exception => error raise error end sm end |
.update_status(id, status) ⇒ Object
Update status value.
140 141 142 143 144 145 146 147 148 149 |
# File 'lib/ap4r/stored_message.rb', line 140 def self.update_status(id, status) return "undefined status: #{status}" unless @@status_value_of.keys.include? status = StoredMessage.find(id) before_status = .status after_status = @@status_value_of[status] .status = after_status .save! end |
Instance Method Details
#destroy_or_update(options = {:delete_mode => PHYSICAL}) ⇒ Object
105 106 107 108 109 110 111 112 113 114 115 116 117 |
# File 'lib/ap4r/stored_message.rb', line 105 def destroy_or_update( = {:delete_mode => PHYSICAL}) case [:delete_mode] when PHYSICAL # TODO: Confirm to raise error, 2006/10/17 kato-k self.destroy when LOGICAL self.status = STATUS_FORWARDED self.save! else raise "unknown delete mode: #{[:delete_mode]}" end self end |
#dumped_headers ⇒ Object
32 33 34 35 36 37 38 39 |
# File 'lib/ap4r/stored_message.rb', line 32 def dumped_headers # The warning occurs when putting backslash into binaly type in PostgreSQL. if self.class.postgresql? self.headers else Marshal::dump(self.headers) end end |
#dumped_object ⇒ Object
41 42 43 44 45 46 47 48 |
# File 'lib/ap4r/stored_message.rb', line 41 def dumped_object # The warning occurs when putting backslash into binaly type in PostgreSQL. if self.class.postgresql? self.object else Marshal::dump(self.object) end end |
#forward_and_update_status ⇒ Object
187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 |
# File 'lib/ap4r/stored_message.rb', line 187 def forward_and_update_status queue_name = self.queue # The warning occurs when putting backslash into binaly type in PostgreSQL. if self.class.postgresql? queue_headers = YAML.load(self.headers) = YAML.load(self.object) else queue_headers = Marshal::load(self.headers) = Marshal::load(self.object) end q = ::ReliableMsg::Queue.new(queue_name, :drb_uri => Ap4r::AsyncHelper::Base::DRUBY_URI) q.put(, queue_headers) self.status = STATUS_FORWARDED self.save! end |
#to_summary_string ⇒ Object
Return id, queue_name and created date time.
135 136 137 |
# File 'lib/ap4r/stored_message.rb', line 135 def to_summary_string return "#{self.id}, #{self.queue}, #{self.created_at}" end |