Class: Messagebus::Producer

Inherits:
Connection show all
Defined in:
lib/messagebus/producer.rb

Overview

Producr client class. Provides simple API to publish events. Refresh connections every connectionLifetime user can specify a load-balancer and the producer will connect with different server each connectionLifetime interval effectively rotating publish load to all machines.

parameters:
dest      (String, required value, name of the queue/topic)
host_params      (list<string>, required value,  eg. '[localhost:61613]')
options : A hash map for optional values.
  user     (String,  default : '')
  passwd  (String,  default : '')
  conn_lifetime_sec      (Int, default:300 secs)
  receipt_wait_timeout_ms (Int, optoional value, default: 5 seconds)

Constant Summary collapse

PUBLISH_HEADERS =
{ :suppress_content_length => true, :persistent => true }
SCHEDULED_DELIVERY_TIME_MS_HEADER =
'scheduled_delivery_time'

Constants inherited from Connection

Connection::STARTED, Connection::STOPPED

Instance Attribute Summary collapse

Attributes inherited from Connection

#host_params, #options

Instance Method Summary collapse

Methods inherited from Connection

#do_with_timeout, #start_server, #started?, #stop_server, #stopped?

Methods included from Validations

#valid_host?, #validate_connection_config, #validate_destination_config

Constructor Details

#initialize(host_params, options = {}) ⇒ Producer

Returns a new instance of Producer.



53
54
55
56
57
# File 'lib/messagebus/producer.rb', line 53

def initialize(host_params, options={})
  options = DottableHash.new(options)
  super(host_params, options)
  validate_connection_config(@host_params, options)
end

Instance Attribute Details

#stateObject

Returns the value of attribute state.



49
50
51
# File 'lib/messagebus/producer.rb', line 49

def state
  @state
end

Instance Method Details

#actual_publish(dest, message, connect_headers = {}, safe = true) ⇒ Object

This is the actual publish method. See publish for why this is designed this way.



94
95
96
97
98
99
100
101
102
103
104
# File 'lib/messagebus/producer.rb', line 94

def actual_publish(dest, message, connect_headers={}, safe=true)
  if !started?
    logger.error "Cannot publish without first starting the producer. Current state is '#{@state}'"
    return
  end
  validate_destination_config(dest)
  publish_internal(dest, message, connect_headers, safe)
rescue => e
  logger.error "Error occurred while publishing the message: #{e}\n #{e.backtrace.join("\n")}"
  return false
end

#publish(*args) ⇒ Object

This is implemented with a *args to workaround the historical api requiring a dest_type parameter. That parameter has been removed, but the api has been kept backwards compatible for now.

Historical version

def publish(dest, dest_type, message, connect_headers={}, safe=true)

For the current version see actual_publish.



85
86
87
88
89
90
91
# File 'lib/messagebus/producer.rb', line 85

def publish(*args)
  if args.size > 2 && args[1].is_a?(String) && (args[1].downcase == 'topic' || args[1].downcase == 'queue')
    logger.warn "Passing dest_type to Producer#publish is deprecated (it isn't needed). Please update your usage."
    args.delete_at(1)
  end
  actual_publish(*args)
end

#startObject

Start the producer client



60
61
62
63
64
65
66
67
# File 'lib/messagebus/producer.rb', line 60

def start
  @state = STARTED
  logger.info "Starting producer with host_params:#{@host_params}"
  @connection_start_time = Time.new
  @stomp = start_server(@host_params, @options.user, @options.passwd)
rescue => e
  logger.error "Error occurred while starting a connection: #{e}\n #{e.backtrace.join("\n")}"
end

#stopObject

Close the producer client



70
71
72
73
74
75
76
# File 'lib/messagebus/producer.rb', line 70

def stop
  @state = STOPPED
  logger.info "Stopping producer with host_params:#{@host_params}"
  stop_server(@stomp)
rescue => e
  logger.error "Error occurred while stopping a connection: #{e}\n #{e.backtrace.join("\n")}"
end