Class: HomeQ::SOBS::Queue
Constant Summary
collapse
- READ_ONLY =
'r'
- WRITE_ONLY =
'w'
- READ_WRITE =
'w+'
- MODES =
[READ_ONLY, WRITE_ONLY, READ_WRITE]
Constants included
from HomeQ
VERSION
Instance Attribute Summary collapse
Class Method Summary
collapse
Instance Method Summary
collapse
Methods included from HomeQ
calculated_homeq_env, calculated_homeq_topology, included, queue_list_for_host_from_topology
#config
#logger
Constructor Details
#initialize(*args) ⇒ Queue
Returns a new instance of Queue.
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
|
# File 'lib/homeq/sobs/queue.rb', line 84
def initialize(*args)
@queue_name, @host, @port, @mode, @threshold, @args = args
raise "Nil queue_name, probably a config error" unless @queue_name
@conn = nil
@retry_timer = config.queue_retry || 5
@acks = {}
@concurrent_outstanding_jobs = 0
@concurrent_outstanding_jobs_highwater = 0
@jobs_received = 0
end
|
Instance Attribute Details
#args ⇒ Object
Returns the value of attribute args.
49
50
51
|
# File 'lib/homeq/sobs/queue.rb', line 49
def args
@args
end
|
#handler ⇒ Object
Returns the value of attribute handler.
50
51
52
|
# File 'lib/homeq/sobs/queue.rb', line 50
def handler
@handler
end
|
#host ⇒ Object
Returns the value of attribute host.
46
47
48
|
# File 'lib/homeq/sobs/queue.rb', line 46
def host
@host
end
|
#mode ⇒ Object
Returns the value of attribute mode.
48
49
50
|
# File 'lib/homeq/sobs/queue.rb', line 48
def mode
@mode
end
|
#port ⇒ Object
Returns the value of attribute port.
47
48
49
|
# File 'lib/homeq/sobs/queue.rb', line 47
def port
@port
end
|
#queue_name ⇒ Object
Returns the value of attribute queue_name.
45
46
47
|
# File 'lib/homeq/sobs/queue.rb', line 45
def queue_name
@queue_name
end
|
Class Method Details
.[](queue_name) ⇒ Object
58
59
60
61
62
|
# File 'lib/homeq/sobs/queue.rb', line 58
def self.[](queue_name)
sys.queues.find { |q|
q.queue_name == queue_name
}
end
|
.create_queues_from_topology(queuename) ⇒ Object
73
74
75
76
77
78
79
80
81
82
|
# File 'lib/homeq/sobs/queue.rb', line 73
def self.create_queues_from_topology(queuename)
config = HomeQ::Base::Configuration::Configuration.instance
q, host, port, hostname = config.topology[queuename]
config.topology.connections(q).each { |peer,mode,threshold|
raise "No config for queue '#{peer}'." unless config.topology[peer]
q, host, port = config.topology[peer]
queue = HomeQ::SOBS::Queue.new(q, host, port, mode, threshold)
Base::System.instance.queues << queue
}
end
|
.send(queue_name, data, app_data = nil, &block) ⇒ Object
64
65
66
67
68
69
70
71
|
# File 'lib/homeq/sobs/queue.rb', line 64
def self.send(queue_name, data, app_data=nil, &block)
dest = Queue[queue_name]
unless dest
raise HomeQ::Base::UnknownQueue.new("Can't find queue #{queue_name}")
end
raise HomeQ::Base::ReadOnlyQueue unless dest.writeable?
dest.send(data, app_data, block)
end
|
Instance Method Details
#ack(message) ⇒ Object
140
141
142
143
144
145
146
|
# File 'lib/homeq/sobs/queue.rb', line 140
def ack(message)
app_data = message.args[1]
if @acks[app_data.to_sym]
@acks[app_data.to_sym].call(app_data)
@acks.delete(app_data.to_sym)
end
end
|
#closed ⇒ Object
109
110
111
112
113
114
115
116
117
118
119
|
# File 'lib/homeq/sobs/queue.rb', line 109
def closed
if @conn
logger.info {
"Disconnected from queue '#{queue_name}' " +
"on #{@conn.remote_endpoint}."
}
@started_at = nil
@conn = nil
end
broadcast_change
end
|
#connected? ⇒ Boolean
186
187
188
|
# File 'lib/homeq/sobs/queue.rb', line 186
def connected?
@conn ? true : false
end
|
#create_job(message) ⇒ Object
151
152
153
154
155
156
157
158
159
160
161
162
|
# File 'lib/homeq/sobs/queue.rb', line 151
def create_job(message)
@jobs_received += 1
@concurrent_outstanding_jobs += 1
if @concurrent_outstanding_jobs > 100
logger.warn {
"Outstanding jobs: #{@concurrent_outstanding_jobs}"
}
end
set_jobs_highwater
j = (@handler || Job).new(message, self)
j.run if j.respond_to?(:run)
end
|
#deleted(message) ⇒ Object
148
149
|
# File 'lib/homeq/sobs/queue.rb', line 148
def deleted(message)
end
|
#opened(conn) ⇒ Object
121
122
123
124
125
126
127
128
129
|
# File 'lib/homeq/sobs/queue.rb', line 121
def opened(conn)
@conn = conn
@conn.refuse_send_threshold = @threshold if @threshold
@started_at = Time.now
logger.info {
"Connected to '#{queue_name}' on #{conn.remote_endpoint}."
}
broadcast_change
end
|
#readable? ⇒ Boolean
182
183
184
|
# File 'lib/homeq/sobs/queue.rb', line 182
def readable?
mode != WRITE_ONLY
end
|
#release_job(job) ⇒ Object
135
136
137
138
|
# File 'lib/homeq/sobs/queue.rb', line 135
def release_job(job)
@conn.delete(job.job_id)
@concurrent_outstanding_jobs -= 1
end
|
#send(data, app_data = nil, proc = nil) ⇒ Object
171
172
173
174
175
176
|
# File 'lib/homeq/sobs/queue.rb', line 171
def send(data, app_data=nil, proc=nil)
return if !@conn || @conn.state.state != :open
app_data ||= generate_app_data
@acks[app_data.to_s.to_sym] = proc if proc
@conn.put(:body=>data, :app_data=>app_data)
end
|
#set_jobs_highwater ⇒ Object
164
165
166
167
168
169
|
# File 'lib/homeq/sobs/queue.rb', line 164
def set_jobs_highwater
@concurrent_outstanding_jobs_highwater = [
@concurrent_outstanding_jobs_highwater,
@concurrent_outstanding_jobs
].max
end
|
#start ⇒ Object
100
101
102
103
104
105
106
107
|
# File 'lib/homeq/sobs/queue.rb', line 100
def start
logger.info {
"Initiating connection to queue '#{@queue_name}' " +
"at #{@host}:#{@port}, mode '#{@mode}', " +
"handler #{(@handler || Job)}."
}
Client.connect(host, port, config.queue_retry || 5, nil, self)
end
|
#stop ⇒ Object
131
132
133
|
# File 'lib/homeq/sobs/queue.rb', line 131
def stop
@conn.close_connection_after_writing if @conn
end
|
#writeable? ⇒ Boolean
178
179
180
|
# File 'lib/homeq/sobs/queue.rb', line 178
def writeable?
mode != READ_ONLY
end
|