Class: Falqon::Message

Inherits:
Object
  • Object
show all
Extended by:
Forwardable, T::Sig
Defined in:
lib/falqon/message.rb

Overview

A message in a queue

This class should typically not be instantiated directly, but rather be created by a queue instance.

Defined Under Namespace

Classes: Metadata

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(queue, id: nil, data: nil) ⇒ Message

Returns a new instance of Message.



39
40
41
42
43
# File 'lib/falqon/message.rb', line 39

def initialize(queue, id: nil, data: nil)
  @queue = queue
  @id = id
  @data = data
end

Instance Attribute Details

#queueObject (readonly)

Returns the value of attribute queue.



17
18
19
# File 'lib/falqon/message.rb', line 17

def queue
  @queue
end

Instance Method Details

#createObject



102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/falqon/message.rb', line 102

def create
  redis.with do |r|
    message_id = id

    r.multi do |t|
      # Store data
      t.set("#{queue.id}:data:#{message_id}", data)

      # Set metadata
      t.hset("#{queue.id}:metadata:#{message_id}",
             :created_at, Time.now.to_i,
             :updated_at, Time.now.to_i,)
    end
  end

  self
end

#dataObject



53
54
55
# File 'lib/falqon/message.rb', line 53

def data
  @data ||= redis.with { |r| r.get("#{queue.id}:data:#{id}") }
end

#dead?Boolean

Returns:

  • (Boolean)


83
84
85
# File 'lib/falqon/message.rb', line 83

def dead?
  .status == "dead"
end

#deleteObject



146
147
148
149
150
151
152
153
154
155
156
157
# File 'lib/falqon/message.rb', line 146

def delete
  redis.with do |r|
    r.multi do |t|
      # Delete message from queue
      queue.pending.remove(id)
      queue.dead.remove(id)

      # Delete data and metadata
      t.del("#{queue.id}:data:#{id}", "#{queue.id}:metadata:#{id}")
    end
  end
end

#exists?Boolean

Returns:

  • (Boolean)


89
90
91
92
93
# File 'lib/falqon/message.rb', line 89

def exists?
  redis.with do |r|
    r.exists("#{queue.id}:data:#{id}") == 1
  end
end

#idObject



47
48
49
# File 'lib/falqon/message.rb', line 47

def id
  @id ||= redis.with { |r| r.incr("#{queue.id}:id") }
end

#inspectObject



182
183
184
# File 'lib/falqon/message.rb', line 182

def inspect
  "#<#{self.class} id=#{id.inspect} size=#{size.inspect}>"
end

#killObject



125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/falqon/message.rb', line 125

def kill
  logger.debug "Killing message #{id} on queue #{queue.name}"

  redis.with do |r|
    # Add identifier to dead queue
    queue.dead.add(id)

    # Reset retry count and set status to dead
    r.hdel("#{queue.id}:metadata:#{id}", :retries)
    r.hset("#{queue.id}:metadata:#{id}", :status, "dead")

    # Remove identifier from queues
    queue.pending.remove(id)
  end
end

#metadataObject



174
175
176
177
178
179
# File 'lib/falqon/message.rb', line 174

def 
  queue.redis.with do |r|
    Metadata
      .parse(r.hgetall("#{queue.id}:metadata:#{id}"))
  end
end

#pending?Boolean

Returns:

  • (Boolean)


65
66
67
# File 'lib/falqon/message.rb', line 65

def pending?
  .status == "pending"
end

#processing?Boolean

Returns:

  • (Boolean)


71
72
73
# File 'lib/falqon/message.rb', line 71

def processing?
  .status == "processing"
end

#scheduled?Boolean

Returns:

  • (Boolean)


77
78
79
# File 'lib/falqon/message.rb', line 77

def scheduled?
  .status == "scheduled"
end

#sizeObject



164
165
166
# File 'lib/falqon/message.rb', line 164

def size
  redis.with { |r| r.strlen("#{queue.id}:data:#{id}") }
end

#unknown?Boolean

Returns:

  • (Boolean)


59
60
61
# File 'lib/falqon/message.rb', line 59

def unknown?
  .status == "unknown"
end