Module: Coil::TransactionalMessage
- Extended by:
- ActiveSupport::Concern
- Included in:
- Inbox::Message, Outbox::Message
- Defined in:
- app/models/concerns/coil/transactional_message.rb
Instance Method Summary collapse
- #locking_persistence_queue(wait: true, &blk) ⇒ Object
- #processed(processor_name:) ⇒ Object
- #processed?(processor_name:) ⇒ Boolean
- #unprocessed_predecessors(processor_name:) ⇒ Object
Instance Method Details
#locking_persistence_queue(wait: true, &blk) ⇒ Object
74 75 76 |
# File 'app/models/concerns/coil/transactional_message.rb', line 74 def locking_persistence_queue(wait: true, &blk) self.class.locking_persistence_queue(keys: [key], wait:, &blk) end |
#processed(processor_name:) ⇒ Object
64 65 66 67 68 |
# File 'app/models/concerns/coil/transactional_message.rb', line 64 def processed(processor_name:) c = completions.find_or_initialize_by(processor_name:) max_id = [c., id].compact.max c.update!(last_completed_message_id: max_id) end |
#processed?(processor_name:) ⇒ Boolean
59 60 61 62 |
# File 'app/models/concerns/coil/transactional_message.rb', line 59 def processed?(processor_name:) persisted? && completions.exists?(processor_name:, last_completed_message_id: id...) end |
#unprocessed_predecessors(processor_name:) ⇒ Object
70 71 72 |
# File 'app/models/concerns/coil/transactional_message.rb', line 70 def unprocessed_predecessors(processor_name:) self.class.unprocessed(processor_name:).where(id: ...id, key:) end |