Class: ModernTimes::JMS::Publisher

Inherits:
Object
  • Object
show all
Defined in:
lib/modern_times/jms/publisher.rb

Constant Summary collapse

@@dummy_publishing =
false

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options) ⇒ Publisher

Parameters:

One of the following must be specified
  :queue_name            => String: Name of the Queue to publish to
  :topic_name            => String: Name of the Topic to publish to (In general this should not be used as all worker threads will receive all messages)
  :virtual_topic_name    => String: Name of the Virtual Topic to publish to
     (ActiveMQ only, see http://activemq.apache.org/virtual-destinations.html
  :destination           => Explicit javax::Jms::Destination to use
Optional:
  :time_to_live          => expiration time in ms for the message
  :persistent            => true or false (defaults to false)
  :marshal               => Symbol: One of :ruby, :string, :json, :bson, :yaml or any registered types (See ModernTimes::MarshalStrategy), defaults to :ruby
  :response              => if true, a temporary reply queue will be setup for handling responses (defaults to false unless any other mt_response_* options are set)
  :response_time_to_live => expiration time in ms for the response message(s)
  :response_persistent   => true or false for the response message(s), set to false if you don't want timed out messages ending up in the DLQ (defaults to true unless mt_response_time_to_live is set)


25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/modern_times/jms/publisher.rb', line 25

def initialize(options)
  raise "ModernTimes::JMS::Connection has not been initialized" unless ModernTimes::JMS::Connection.inited? || @@dummy_publishing
  producer_keys = [:queue_name, :topic_name, :virtual_topic_name, :destination]
  @producer_options = options.reject {|k,v| !producer_keys.include?(k)}
  raise "One of #{producer_keys.join(',')} must be given in #{self.class.name}" if @producer_options.empty?

  # Save our @producer_options for destination comparison when doing dummy_publish,
  # but create the real options by translating virtual_topic_name to a real topic_name.
  @real_producer_options = @producer_options.dup
  virtual_topic_name = @real_producer_options.delete(:virtual_topic_name)
  @real_producer_options[:topic_name] = "VirtualTopic.#{virtual_topic_name}" if virtual_topic_name

  @persistent_sym = options[:persistent] ? :persistent : :non_persistent
  @marshal = options[:marshal] || :ruby
  @marshaler = ModernTimes::MarshalStrategy.find(@marshal)
  @time_to_live = options[:time_to_live]
  @response_time_to_live_str = options[:response_time_to_live] && options[:response_time_to_live].to_s
  @response_persistent_str = nil
  @response_persistent_str = (!!options[:response_persistent]).to_s unless options[:response_persistent].nil?

  @is_response = options[:response] || !@response_time_to_live_str.nil? || !@response_persistent_str.nil?
  @reply_queue = nil
  if !@@dummy_publishing  && @is_response
    ModernTimes::JMS::Connection.session_pool.session do |session|
      @reply_queue = session.create_destination(:queue_name => :temporary)
    end
  end
end

Instance Attribute Details

#marshalerObject (readonly)

Returns the value of attribute marshaler.



7
8
9
# File 'lib/modern_times/jms/publisher.rb', line 7

def marshaler
  @marshaler
end

#persistentObject (readonly)

Returns the value of attribute persistent.



7
8
9
# File 'lib/modern_times/jms/publisher.rb', line 7

def persistent
  @persistent
end

#producer_optionsObject (readonly)

Returns the value of attribute producer_options.



7
8
9
# File 'lib/modern_times/jms/publisher.rb', line 7

def producer_options
  @producer_options
end

#reply_queueObject (readonly)

Returns the value of attribute reply_queue.



7
8
9
# File 'lib/modern_times/jms/publisher.rb', line 7

def reply_queue
  @reply_queue
end

Class Method Details

.clear_dummy_publishingObject

For testing



134
135
136
137
138
139
140
# File 'lib/modern_times/jms/publisher.rb', line 134

def self.clear_dummy_publishing
  @@dummy_publishing = false
  alias_method :dummy_publish, :publish
  alias_method :publish, :real_publish
  #remove_method :real_publish
  PublishHandle.clear_dummy_handling
end

.setup_dummy_publishing(worker_pools) ⇒ Object



125
126
127
128
129
130
131
# File 'lib/modern_times/jms/publisher.rb', line 125

def self.setup_dummy_publishing(worker_pools)
  @@dummy_publishing = true
  @@worker_pools = worker_pools
  alias_method :real_publish, :publish
  alias_method :publish, :dummy_publish
  PublishHandle.setup_dummy_handling
end

Instance Method Details

#dummy_publish(object, props = {}) ⇒ Object

For non-configured Rails projects, The above publish method will be overridden to call this publish method instead which calls all the JMS workers that operate on the given address.



81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/modern_times/jms/publisher.rb', line 81

def dummy_publish(object, props={})
  dummy_handle = PublishHandle.new(self, nil, Time.now)
  # Model real queue marshaling/unmarshaling
  trans_object = @marshaler.unmarshal(@marshaler.marshal(object))
  @@worker_pools.each do |worker_pool|
    worker_pool.with_connection do |worker|
      if ModernTimes::JMS.same_destination?(@producer_options, worker.destination_options)
        if worker.kind_of?(RequestWorker)
          ModernTimes.logger.debug "Dummy request publishing #{trans_object} to #{worker}"
          m = worker.marshaler
          # Model real queue marshaling/unmarshaling
          begin
            response_object = m.unmarshal(m.marshal(worker.request(trans_object)))
            dummy_handle.add_dummy_response(worker.name, response_object)
          rescue Exception => e
            ModernTimes.logger.error("#{worker} Exception: #{e.message}")
            worker.log_backtrace(e)
            dummy_handle.add_dummy_response(worker.name, ModernTimes::RemoteException.new(e))
          end
          begin
            worker.post_request(trans_object)
          rescue Exception => e
            ModernTimes.logger.error("#{worker} Exception in post_request: #{e.message}")
            worker.log_backtrace(e)
          end
        elsif worker.kind_of?(Worker)
          ModernTimes.logger.debug "Dummy publishing #{trans_object} to #{worker}"
          begin
            worker.perform(trans_object)
          rescue Exception => e
            ModernTimes.logger.error("#{worker} Exception: #{e.message}")
            worker.log_backtrace(e)
          end
        end
      end
    end
  end
  return dummy_handle
end

#publish(object, props = {}) ⇒ Object

Publish the given object to the address.



59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/modern_times/jms/publisher.rb', line 59

def publish(object, props={})
  start = Time.now
  message = nil
  Connection.session_pool.producer(@real_producer_options) do |session, producer|
    producer.time_to_live      = @time_to_live if @time_to_live
    producer.delivery_mode_sym = @persistent_sym
    message = ModernTimes::JMS.create_message(session, @marshaler, object)
    message.jms_reply_to                = @reply_queue if @reply_queue
    message['mt:marshal']               = @marshal.to_s
    message['mt:response:time_to_live'] = @response_time_to_live_str if @response_time_to_live_str
    message['mt:response:persistent']   = @response_persistent_str unless @response_persistent_str.nil?
    props.each do |key, value|
      message.send("#{key}=", value)
    end
    producer.send(message)
  end
  return PublishHandle.new(self, message.jms_message_id, start)
end

#response?Boolean

Returns:

  • (Boolean)


54
55
56
# File 'lib/modern_times/jms/publisher.rb', line 54

def response?
  @is_response
end

#to_sObject



121
122
123
# File 'lib/modern_times/jms/publisher.rb', line 121

def to_s
  "#{self.class.name}:#{@real_producer_options.inspect}"
end