Class: Qwirk::Adapter::JMS::Publisher
- Inherits:
-
Object
- Object
- Qwirk::Adapter::JMS::Publisher
- Defined in:
- lib/qwirk/adapter/jms/publisher.rb
Defined Under Namespace
Classes: MyConsumer, MyTaskProducer
Instance Method Summary collapse
- #create_fail_producer_consumer_pair(task_id, marshaler) ⇒ Object
-
#create_producer_consumer_pair(task_id, marshaler) ⇒ Object
See Qwirk::Publisher#create_task_consumer for the requirements for this method.
- #default_marshal_sym ⇒ Object
-
#initialize(adapter_factory, queue_name, topic_name, options, response_options) ⇒ Publisher
constructor
attr_reader :persistent, :marshaler, :reply_queue.
-
#publish(marshaled_object, marshaler, task_id, props) ⇒ Object
Publish the given object and return the message_id.
- #to_s ⇒ Object
-
#with_response(message_id, &block) ⇒ Object
See Qwirk::PublishHandle#read_response for the requirements for this method.
Constructor Details
#initialize(adapter_factory, queue_name, topic_name, options, response_options) ⇒ Publisher
attr_reader :persistent, :marshaler, :reply_queue
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
# File 'lib/qwirk/adapter/jms/publisher.rb', line 9 def initialize(adapter_factory, queue_name, topic_name, , ) @connection = adapter_factory.adapter_info ||= {} @dest_options = {:queue_name => queue_name} if queue_name @dest_options = {:topic_name => topic_name} if topic_name @persistent_sym = [:persistent] ? :persistent : :non_persistent @time_to_live = [:time_to_live] @response_time_to_live_str = [:time_to_live] && [:time_to_live].to_s @response_persistent_str = nil @response_persistent_str = (!![:persistent]).to_s unless [:persistent].nil? @connection.session_pool.session do |session| # TODO: Use sync attribute so these queues aren't constantly created. @dest_queue = session.create_destination(@dest_options) if reply_queue_name = [:queue_name] || :temporary @reply_queue = session.create_destination(:queue_name => reply_queue_name) end end end |
Instance Method Details
#create_fail_producer_consumer_pair(task_id, marshaler) ⇒ Object
72 73 74 75 76 77 78 79 80 |
# File 'lib/qwirk/adapter/jms/publisher.rb', line 72 def create_fail_producer_consumer_pair(task_id, marshaler) fail_queue = nil @connection.session_pool.session do |session| fail_queue = session.create_destination(:queue_name => :temporary) end producer = MyTaskProducer.new(self, fail_queue, task_id, marshaler) consumer = Consumer.new(@connection, :destination => reply_queue, :selector => "QwirkTaskID = '#{task_id}'") return producer, consumer end |
#create_producer_consumer_pair(task_id, marshaler) ⇒ Object
See Qwirk::Publisher#create_task_consumer for the requirements for this method.
66 67 68 69 70 |
# File 'lib/qwirk/adapter/jms/publisher.rb', line 66 def create_producer_consumer_pair(task_id, marshaler) producer = MyTaskProducer.new(self, @reply_queue, task_id, marshaler) consumer = Consumer.new(@connection, :destination => reply_queue, :selector => "QwirkTaskID = '#{task_id}'") return producer, consumer end |
#default_marshal_sym ⇒ Object
30 31 32 |
# File 'lib/qwirk/adapter/jms/publisher.rb', line 30 def default_marshal_sym :ruby end |
#publish(marshaled_object, marshaler, task_id, props) ⇒ Object
Publish the given object and return the message_id. TODO: Too hackish to include task_id in here, think of a better solution
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/qwirk/adapter/jms/publisher.rb', line 36 def publish(marshaled_object, marshaler, task_id, props) = nil @connection.session_pool.producer(:destination => @dest_queue) do |session, producer| producer.time_to_live = @time_to_live if @time_to_live producer.delivery_mode_sym = @persistent_sym = Util.(session, marshaled_object, marshaler.marshal_type) .jms_reply_to = @reply_queue if @reply_queue ['qwirk:marshal'] = marshaler.to_sym.to_s ['qwirk:response:time_to_live'] = @response_time_to_live_str if @response_time_to_live_str ['qwirk:response:persistent'] = @response_persistent_str unless @response_persistent_str.nil? ['QwirkTaskID'] = task_id if task_id props.each do |key, value| .send("#{key}=", value) end producer.send() end # Return the adapter_info which for JMS is the message_id. This value will be sent to with_response below. return . end |
#to_s ⇒ Object
82 83 84 |
# File 'lib/qwirk/adapter/jms/publisher.rb', line 82 def to_s "Publisher: #{@dest_options.inspect}" end |
#with_response(message_id, &block) ⇒ Object
See Qwirk::PublishHandle#read_response for the requirements for this method.
57 58 59 60 61 62 63 |
# File 'lib/qwirk/adapter/jms/publisher.rb', line 57 def with_response(, &block) raise "Invalid call to with_response for #{self}, not setup for responding" unless @reply_queue = { :destination => @reply_queue, :selector => "JMSCorrelationID = '#{}'" } @connection.session_pool.consumer() do |session, consumer| yield MyConsumer.new(consumer) end end |