Class: HomeQ::SOBS::Queue

Inherits:
Object
  • Object
show all
Includes:
HomeQ, Base::Configuration, Base::Logging, Observable
Defined in:
lib/homeq/sobs/queue.rb

Constant Summary collapse

READ_ONLY =
'r'
WRITE_ONLY =
'w'
READ_WRITE =
'w+'
MODES =
[READ_ONLY, WRITE_ONLY, READ_WRITE]

Constants included from HomeQ

VERSION

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from HomeQ

calculated_homeq_env, calculated_homeq_topology, included, queue_list_for_host_from_topology

Methods included from Base::Configuration

#config

Methods included from Base::Logging

#logger

Constructor Details

#initialize(*args) ⇒ Queue

Returns a new instance of Queue.



84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/homeq/sobs/queue.rb', line 84

def initialize(*args)
  @queue_name, @host, @port, @mode, @threshold, @args = args
  raise "Nil queue_name, probably a config error" unless @queue_name

  @conn                                   = nil
  @retry_timer                            = config.queue_retry || 5
  @acks                                   = {}
  
  # stats
  
  @concurrent_outstanding_jobs            = 0
  @concurrent_outstanding_jobs_highwater  = 0
  @jobs_received                          = 0

end

Instance Attribute Details

#argsObject

Returns the value of attribute args.



49
50
51
# File 'lib/homeq/sobs/queue.rb', line 49

def args
  @args
end

#handlerObject

Returns the value of attribute handler.



50
51
52
# File 'lib/homeq/sobs/queue.rb', line 50

def handler
  @handler
end

#hostObject

Returns the value of attribute host.



46
47
48
# File 'lib/homeq/sobs/queue.rb', line 46

def host
  @host
end

#modeObject

Returns the value of attribute mode.



48
49
50
# File 'lib/homeq/sobs/queue.rb', line 48

def mode
  @mode
end

#portObject

Returns the value of attribute port.



47
48
49
# File 'lib/homeq/sobs/queue.rb', line 47

def port
  @port
end

#queue_nameObject

Returns the value of attribute queue_name.



45
46
47
# File 'lib/homeq/sobs/queue.rb', line 45

def queue_name
  @queue_name
end

Class Method Details

.[](queue_name) ⇒ Object



58
59
60
61
62
# File 'lib/homeq/sobs/queue.rb', line 58

def self.[](queue_name)
  sys.queues.find { |q|
    q.queue_name == queue_name
  }
end

.create_queues_from_topology(queuename) ⇒ Object



73
74
75
76
77
78
79
80
81
82
# File 'lib/homeq/sobs/queue.rb', line 73

def self.create_queues_from_topology(queuename)
  config = HomeQ::Base::Configuration::Configuration.instance
  q, host, port, hostname = config.topology[queuename]
  config.topology.connections(q).each { |peer,mode,threshold|
    raise "No config for queue '#{peer}'." unless config.topology[peer]
    q, host, port = config.topology[peer]
    queue = HomeQ::SOBS::Queue.new(q, host, port, mode, threshold)
    Base::System.instance.queues << queue
  }
end

.send(queue_name, data, app_data = nil, &block) ⇒ Object



64
65
66
67
68
69
70
71
# File 'lib/homeq/sobs/queue.rb', line 64

def self.send(queue_name, data, app_data=nil, &block)
  dest = Queue[queue_name]
  unless dest
    raise HomeQ::Base::UnknownQueue.new("Can't find queue #{queue_name}") 
  end
  raise HomeQ::Base::ReadOnlyQueue unless dest.writeable?
  dest.send(data, app_data, block)
end

Instance Method Details

#ack(message) ⇒ Object



140
141
142
143
144
145
146
# File 'lib/homeq/sobs/queue.rb', line 140

def ack(message)
  app_data = message.args[1]
  if @acks[app_data.to_sym]
    @acks[app_data.to_sym].call(app_data)
    @acks.delete(app_data.to_sym)
  end
end

#closedObject



109
110
111
112
113
114
115
116
117
118
119
# File 'lib/homeq/sobs/queue.rb', line 109

def closed
  if @conn
    logger.info {
      "Disconnected from queue '#{queue_name}' " +
        "on #{@conn.remote_endpoint}."
    }
    @started_at = nil
    @conn = nil
  end
  broadcast_change
end

#connected?Boolean

Returns:

  • (Boolean)


186
187
188
# File 'lib/homeq/sobs/queue.rb', line 186

def connected?
  @conn ? true : false
end

#create_job(message) ⇒ Object



151
152
153
154
155
156
157
158
159
160
161
162
# File 'lib/homeq/sobs/queue.rb', line 151

def create_job(message)
  @jobs_received               += 1
  @concurrent_outstanding_jobs += 1
  if @concurrent_outstanding_jobs > 100
    logger.warn {
      "Outstanding jobs: #{@concurrent_outstanding_jobs}"
    }
  end
  set_jobs_highwater
  j = (@handler || Job).new(message, self)
  j.run if j.respond_to?(:run)
end

#deleted(message) ⇒ Object



148
149
# File 'lib/homeq/sobs/queue.rb', line 148

def deleted(message)
end

#opened(conn) ⇒ Object



121
122
123
124
125
126
127
128
129
# File 'lib/homeq/sobs/queue.rb', line 121

def opened(conn)
  @conn = conn
  @conn.refuse_send_threshold = @threshold if @threshold
  @started_at = Time.now
  logger.info {
    "Connected to '#{queue_name}' on #{conn.remote_endpoint}."
  }
  broadcast_change
end

#readable?Boolean

Returns:

  • (Boolean)


182
183
184
# File 'lib/homeq/sobs/queue.rb', line 182

def readable?
  mode != WRITE_ONLY
end

#release_job(job) ⇒ Object



135
136
137
138
# File 'lib/homeq/sobs/queue.rb', line 135

def release_job(job)
  @conn.delete(job.job_id)
  @concurrent_outstanding_jobs -= 1
end

#send(data, app_data = nil, proc = nil) ⇒ Object



171
172
173
174
175
176
# File 'lib/homeq/sobs/queue.rb', line 171

def send(data, app_data=nil, proc=nil)
  return if !@conn || @conn.state.state != :open
  app_data ||= generate_app_data
  @acks[app_data.to_s.to_sym] = proc if proc
  @conn.put(:body=>data, :app_data=>app_data)
end

#set_jobs_highwaterObject



164
165
166
167
168
169
# File 'lib/homeq/sobs/queue.rb', line 164

def set_jobs_highwater
  @concurrent_outstanding_jobs_highwater = [
    @concurrent_outstanding_jobs_highwater,
    @concurrent_outstanding_jobs
  ].max
end

#startObject



100
101
102
103
104
105
106
107
# File 'lib/homeq/sobs/queue.rb', line 100

def start
  logger.info {
    "Initiating connection to queue '#{@queue_name}' " +
      "at #{@host}:#{@port}, mode '#{@mode}', " +
      "handler #{(@handler || Job)}."
  }
  Client.connect(host, port, config.queue_retry || 5, nil, self)
end

#stopObject



131
132
133
# File 'lib/homeq/sobs/queue.rb', line 131

def stop
  @conn.close_connection_after_writing if @conn
end

#writeable?Boolean

Returns:

  • (Boolean)


178
179
180
# File 'lib/homeq/sobs/queue.rb', line 178

def writeable?
  mode != READ_ONLY
end