Class: Messagebus::Producer
- Inherits:
-
Connection
- Object
- Connection
- Messagebus::Producer
- Defined in:
- lib/messagebus/producer.rb
Overview
Producr client class. Provides simple API to publish events. Refresh connections every connectionLifetime user can specify a load-balancer and the producer will connect with different server each connectionLifetime interval effectively rotating publish load to all machines.
parameters:
dest (String, required value, name of the queue/topic)
host_params (list<string>, required value, eg. '[localhost:61613]')
options : A hash map for optional values.
user (String, default : '')
passwd (String, default : '')
conn_lifetime_sec (Int, default:300 secs)
receipt_wait_timeout_ms (Int, optoional value, default: 5 seconds)
Constant Summary collapse
- PUBLISH_HEADERS =
{ :suppress_content_length => true, :persistent => true }
- SCHEDULED_DELIVERY_TIME_MS_HEADER =
'scheduled_delivery_time'
Constants inherited from Connection
Connection::STARTED, Connection::STOPPED
Instance Attribute Summary collapse
-
#state ⇒ Object
Returns the value of attribute state.
Attributes inherited from Connection
Instance Method Summary collapse
-
#actual_publish(dest, message, connect_headers = {}, safe = true) ⇒ Object
This is the actual publish method.
-
#initialize(host_params, options = {}) ⇒ Producer
constructor
A new instance of Producer.
-
#publish(*args) ⇒ Object
This is implemented with a *args to workaround the historical api requiring a dest_type parameter.
-
#start ⇒ Object
Start the producer client.
-
#stop ⇒ Object
Close the producer client.
Methods inherited from Connection
#do_with_timeout, #start_server, #started?, #stop_server, #stopped?
Methods included from Validations
#valid_host?, #validate_connection_config, #validate_destination_config
Constructor Details
#initialize(host_params, options = {}) ⇒ Producer
Returns a new instance of Producer.
53 54 55 56 57 |
# File 'lib/messagebus/producer.rb', line 53 def initialize(host_params, ={}) = DottableHash.new() super(host_params, ) validate_connection_config(@host_params, ) end |
Instance Attribute Details
#state ⇒ Object
Returns the value of attribute state.
49 50 51 |
# File 'lib/messagebus/producer.rb', line 49 def state @state end |
Instance Method Details
#actual_publish(dest, message, connect_headers = {}, safe = true) ⇒ Object
This is the actual publish method. See publish for why this is designed this way.
94 95 96 97 98 99 100 101 102 103 104 |
# File 'lib/messagebus/producer.rb', line 94 def actual_publish(dest, , connect_headers={}, safe=true) if !started? logger.error "Cannot publish without first starting the producer. Current state is '#{@state}'" return end validate_destination_config(dest) publish_internal(dest, , connect_headers, safe) rescue => e logger.error "Error occurred while publishing the message: #{e}\n #{e.backtrace.join("\n")}" return false end |
#publish(*args) ⇒ Object
This is implemented with a *args to workaround the historical api requiring a dest_type parameter. That parameter has been removed, but the api has been kept backwards compatible for now.
Historical version
def publish(dest, dest_type, message, connect_headers={}, safe=true)
For the current version see actual_publish.
85 86 87 88 89 90 91 |
# File 'lib/messagebus/producer.rb', line 85 def publish(*args) if args.size > 2 && args[1].is_a?(String) && (args[1].downcase == 'topic' || args[1].downcase == 'queue') logger.warn "Passing dest_type to Producer#publish is deprecated (it isn't needed). Please update your usage." args.delete_at(1) end actual_publish(*args) end |
#start ⇒ Object
Start the producer client
60 61 62 63 64 65 66 67 |
# File 'lib/messagebus/producer.rb', line 60 def start @state = STARTED logger.info "Starting producer with host_params:#{@host_params}" @connection_start_time = Time.new @stomp = start_server(@host_params, @options.user, @options.passwd) rescue => e logger.error "Error occurred while starting a connection: #{e}\n #{e.backtrace.join("\n")}" end |
#stop ⇒ Object
Close the producer client
70 71 72 73 74 75 76 |
# File 'lib/messagebus/producer.rb', line 70 def stop @state = STOPPED logger.info "Stopping producer with host_params:#{@host_params}" stop_server(@stomp) rescue => e logger.error "Error occurred while stopping a connection: #{e}\n #{e.backtrace.join("\n")}" end |