Class: ModernTimes::JMS::Publisher
- Inherits:
-
Object
- Object
- ModernTimes::JMS::Publisher
- Defined in:
- lib/modern_times/jms/publisher.rb
Constant Summary collapse
- @@dummy_publishing =
false
Instance Attribute Summary collapse
-
#marshaler ⇒ Object
readonly
Returns the value of attribute marshaler.
-
#persistent ⇒ Object
readonly
Returns the value of attribute persistent.
-
#producer_options ⇒ Object
readonly
Returns the value of attribute producer_options.
-
#reply_queue ⇒ Object
readonly
Returns the value of attribute reply_queue.
Class Method Summary collapse
Instance Method Summary collapse
-
#dummy_publish(object, props = {}) ⇒ Object
For non-configured Rails projects, The above publish method will be overridden to call this publish method instead which calls all the JMS workers that operate on the given address.
-
#initialize(options) ⇒ Publisher
constructor
Parameters: One of the following must be specified :queue_name => String: Name of the Queue to publish to :topic_name => String: Name of the Topic to publish to (In general this should not be used as all worker threads will receive all messages) :virtual_topic_name => String: Name of the Virtual Topic to publish to (ActiveMQ only, see activemq.apache.org/virtual-destinations.html :destination => Explicit javax::Jms::Destination to use Optional: :time_to_live => expiration time in ms for the message :persistent => true or false (defaults to false) :marshal => Symbol: One of :ruby, :string, :json, :bson, :yaml or any registered types (See ModernTimes::MarshalStrategy), defaults to :ruby :response => if true, a temporary reply queue will be setup for handling responses (defaults to false unless any other mt_response_* options are set) :response_time_to_live => expiration time in ms for the response message(s) :response_persistent => true or false for the response message(s), set to false if you don’t want timed out messages ending up in the DLQ (defaults to true unless mt_response_time_to_live is set).
-
#publish(object, props = {}) ⇒ Object
Publish the given object to the address.
- #response? ⇒ Boolean
- #to_s ⇒ Object
Constructor Details
#initialize(options) ⇒ Publisher
Parameters:
One of the following must be specified
:queue_name => String: Name of the Queue to publish to
:topic_name => String: Name of the Topic to publish to (In general this should not be used as all worker threads will receive all messages)
:virtual_topic_name => String: Name of the Virtual Topic to publish to
(ActiveMQ only, see http://activemq.apache.org/virtual-destinations.html
:destination => Explicit javax::Jms::Destination to use
Optional:
:time_to_live => expiration time in ms for the message
:persistent => true or false (defaults to false)
:marshal => Symbol: One of :ruby, :string, :json, :bson, :yaml or any registered types (See ModernTimes::MarshalStrategy), defaults to :ruby
:response => if true, a temporary reply queue will be setup for handling responses (defaults to false unless any other mt_response_* options are set)
:response_time_to_live => expiration time in ms for the response message(s)
:response_persistent => true or false for the response message(s), set to false if you don't want timed out messages ending up in the DLQ (defaults to true unless mt_response_time_to_live is set)
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/modern_times/jms/publisher.rb', line 25 def initialize() raise "ModernTimes::JMS::Connection has not been initialized" unless ModernTimes::JMS::Connection.inited? || @@dummy_publishing producer_keys = [:queue_name, :topic_name, :virtual_topic_name, :destination] @producer_options = .reject {|k,v| !producer_keys.include?(k)} raise "One of #{producer_keys.join(',')} must be given in #{self.class.name}" if @producer_options.empty? # Save our @producer_options for destination comparison when doing dummy_publish, # but create the real options by translating virtual_topic_name to a real topic_name. @real_producer_options = @producer_options.dup virtual_topic_name = @real_producer_options.delete(:virtual_topic_name) @real_producer_options[:topic_name] = "VirtualTopic.#{virtual_topic_name}" if virtual_topic_name @persistent_sym = [:persistent] ? :persistent : :non_persistent @marshal = [:marshal] || :ruby @marshaler = ModernTimes::MarshalStrategy.find(@marshal) @time_to_live = [:time_to_live] @response_time_to_live_str = [:response_time_to_live] && [:response_time_to_live].to_s @response_persistent_str = nil @response_persistent_str = (!![:response_persistent]).to_s unless [:response_persistent].nil? @is_response = [:response] || !@response_time_to_live_str.nil? || !@response_persistent_str.nil? @reply_queue = nil if !@@dummy_publishing && @is_response ModernTimes::JMS::Connection.session_pool.session do |session| @reply_queue = session.create_destination(:queue_name => :temporary) end end end |
Instance Attribute Details
#marshaler ⇒ Object (readonly)
Returns the value of attribute marshaler.
7 8 9 |
# File 'lib/modern_times/jms/publisher.rb', line 7 def marshaler @marshaler end |
#persistent ⇒ Object (readonly)
Returns the value of attribute persistent.
7 8 9 |
# File 'lib/modern_times/jms/publisher.rb', line 7 def persistent @persistent end |
#producer_options ⇒ Object (readonly)
Returns the value of attribute producer_options.
7 8 9 |
# File 'lib/modern_times/jms/publisher.rb', line 7 def @producer_options end |
#reply_queue ⇒ Object (readonly)
Returns the value of attribute reply_queue.
7 8 9 |
# File 'lib/modern_times/jms/publisher.rb', line 7 def reply_queue @reply_queue end |
Class Method Details
.clear_dummy_publishing ⇒ Object
For testing
134 135 136 137 138 139 140 |
# File 'lib/modern_times/jms/publisher.rb', line 134 def self.clear_dummy_publishing @@dummy_publishing = false alias_method :dummy_publish, :publish alias_method :publish, :real_publish #remove_method :real_publish PublishHandle.clear_dummy_handling end |
.setup_dummy_publishing(worker_pools) ⇒ Object
125 126 127 128 129 130 131 |
# File 'lib/modern_times/jms/publisher.rb', line 125 def self.setup_dummy_publishing(worker_pools) @@dummy_publishing = true @@worker_pools = worker_pools alias_method :real_publish, :publish alias_method :publish, :dummy_publish PublishHandle.setup_dummy_handling end |
Instance Method Details
#dummy_publish(object, props = {}) ⇒ Object
For non-configured Rails projects, The above publish method will be overridden to call this publish method instead which calls all the JMS workers that operate on the given address.
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 |
# File 'lib/modern_times/jms/publisher.rb', line 81 def dummy_publish(object, props={}) dummy_handle = PublishHandle.new(self, nil, Time.now) # Model real queue marshaling/unmarshaling trans_object = @marshaler.unmarshal(@marshaler.marshal(object)) @@worker_pools.each do |worker_pool| worker_pool.with_connection do |worker| if ModernTimes::JMS.same_destination?(@producer_options, worker.) if worker.kind_of?(RequestWorker) ModernTimes.logger.debug "Dummy request publishing #{trans_object} to #{worker}" m = worker.marshaler # Model real queue marshaling/unmarshaling begin response_object = m.unmarshal(m.marshal(worker.request(trans_object))) dummy_handle.add_dummy_response(worker.name, response_object) rescue Exception => e ModernTimes.logger.error("#{worker} Exception: #{e.}") worker.log_backtrace(e) dummy_handle.add_dummy_response(worker.name, ModernTimes::RemoteException.new(e)) end begin worker.post_request(trans_object) rescue Exception => e ModernTimes.logger.error("#{worker} Exception in post_request: #{e.}") worker.log_backtrace(e) end elsif worker.kind_of?(Worker) ModernTimes.logger.debug "Dummy publishing #{trans_object} to #{worker}" begin worker.perform(trans_object) rescue Exception => e ModernTimes.logger.error("#{worker} Exception: #{e.}") worker.log_backtrace(e) end end end end end return dummy_handle end |
#publish(object, props = {}) ⇒ Object
Publish the given object to the address.
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/modern_times/jms/publisher.rb', line 59 def publish(object, props={}) start = Time.now = nil Connection.session_pool.producer(@real_producer_options) do |session, producer| producer.time_to_live = @time_to_live if @time_to_live producer.delivery_mode_sym = @persistent_sym = ModernTimes::JMS.(session, @marshaler, object) .jms_reply_to = @reply_queue if @reply_queue ['mt:marshal'] = @marshal.to_s ['mt:response:time_to_live'] = @response_time_to_live_str if @response_time_to_live_str ['mt:response:persistent'] = @response_persistent_str unless @response_persistent_str.nil? props.each do |key, value| .send("#{key}=", value) end producer.send() end return PublishHandle.new(self, ., start) end |
#response? ⇒ Boolean
54 55 56 |
# File 'lib/modern_times/jms/publisher.rb', line 54 def response? @is_response end |
#to_s ⇒ Object
121 122 123 |
# File 'lib/modern_times/jms/publisher.rb', line 121 def to_s "#{self.class.name}:#{@real_producer_options.inspect}" end |