Class: Messagebus::Client
- Inherits:
-
Object
- Object
- Messagebus::Client
- Defined in:
- lib/messagebus/client.rb
Defined Under Namespace
Classes: InitializationError, InvalidDestinationError
Instance Attribute Summary collapse
-
#cluster_map ⇒ Object
readonly
Returns the value of attribute cluster_map.
-
#config ⇒ Object
readonly
Returns the value of attribute config.
-
#last_reload_time ⇒ Object
readonly
Returns the value of attribute last_reload_time.
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
Class Method Summary collapse
Instance Method Summary collapse
- #headers(delay) ⇒ Object
-
#initialize(config_hash) ⇒ Client
constructor
A new instance of Client.
- #publish(destination_name, object, delay_ms = 0, safe = true, binary = false, headers = {}) ⇒ Object
- #reload_config_on_interval(config, interval = 300) ⇒ Object
-
#start ⇒ Object
Starts up all the connections to the bus.
- #stop ⇒ Object
Constructor Details
#initialize(config_hash) ⇒ Client
Returns a new instance of Client.
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/messagebus/client.rb', line 56 def initialize(config_hash) @@logger = if provided_logger = config_hash.delete(:logger) provided_logger elsif log_file = config_hash["log_file"] Logger.new(log_file) else Logger.new(Messagebus::LOG_DEFAULT_FILE) end # This is required to do a deep clone of config hash object @config = DottableHash.new(Marshal.load(Marshal.dump(config_hash))) @config.merge!(@config.cluster_defaults) if @config.cluster_defaults @@logger.level = Logger::Severity.const_get(@config.log_level.upcase) if @config.log_level @enable_client_logger_thread_debugging = config.enable_client_logger_thread_debugging logger.debug "Initializing Messagebus client." @cluster_map = ClusterMap.new(@config) #added for reloading config on interval @last_reload_time = Time.now end |
Instance Attribute Details
#cluster_map ⇒ Object (readonly)
Returns the value of attribute cluster_map.
40 41 42 |
# File 'lib/messagebus/client.rb', line 40 def cluster_map @cluster_map end |
#config ⇒ Object (readonly)
Returns the value of attribute config.
40 41 42 |
# File 'lib/messagebus/client.rb', line 40 def config @config end |
#last_reload_time ⇒ Object (readonly)
Returns the value of attribute last_reload_time.
40 41 42 |
# File 'lib/messagebus/client.rb', line 40 def last_reload_time @last_reload_time end |
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
40 41 42 |
# File 'lib/messagebus/client.rb', line 40 def logger @logger end |
Class Method Details
.logger ⇒ Object
161 162 163 |
# File 'lib/messagebus/client.rb', line 161 def logger @@logger ||= Logger.new(Messagebus::LOG_DEFAULT_FILE) end |
.start(config_hash) ⇒ Object
42 43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/messagebus/client.rb', line 42 def self.start(config_hash) client = new(config_hash) client.start if block_given? begin yield client ensure client.stop end end client end |
Instance Method Details
#headers(delay) ⇒ Object
149 150 151 152 153 154 155 156 157 158 |
# File 'lib/messagebus/client.rb', line 149 def headers(delay) headers = {} unless delay == 0 schedule_time = (Time.now.to_i * 1000 + delay).to_s headers.merge!({ Messagebus::Producer::SCHEDULED_DELIVERY_TIME_MS_HEADER => schedule_time }) end headers end |
#publish(destination_name, object, delay_ms = 0, safe = true, binary = false, headers = {}) ⇒ Object
99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 |
# File 'lib/messagebus/client.rb', line 99 def publish(destination_name, object, delay_ms = 0, safe = true, binary = false, headers = {}) if !(@config.enable_auto_init_connections) logger.warn "Config['enable_auto_init_connections'] is false, not publishing destination_name=#{destination_name}, message_contents=#{object.inspect}" false else producer = cluster_map.find(destination_name) if producer.nil? logger.error "Not publishing due to unconfigured destionation name. destination_name=#{destination_name}, message=#{object.inspect}" raise InvalidDestinationError, "Destination #{destination_name} not found" end if binary = Messagebus::Message.create(object, nil, binary) else = Messagebus::Message.create(object) end logger.info "Publishing to destination_name=#{destination_name}, message_id=#{.}, message_contents=#{object.inspect}" begin publish_result = nil duration = Benchmark.realtime do publish_result = producer.publish(destination_name, , headers(delay_ms).merge(headers), safe) end duration = (duration * 1_000).round if publish_result logger.info "Message publishing to #{destination_name} took #{duration} result=success destination_name=#{destination_name}, message_id=#{.}, duration=#{duration}ms" true else logger.error "Failed to publish message result=fail destination_name=#{destination_name}, message_id=#{.}, duration=#{duration}ms, message_contents=#{object.inspect}" false end rescue => e logger.error "Failed to publish message result=error destination_name=#{destination_name}, message_id=#{.}, duration=#{duration}ms, message_contents=#{object.inspect}, error=#{e.inspect}, backtrace=#{e.backtrace.join("|")}" false end end end |
#reload_config_on_interval(config, interval = 300) ⇒ Object
140 141 142 143 144 145 146 147 |
# File 'lib/messagebus/client.rb', line 140 def reload_config_on_interval(config, interval = 300) logger.info "Relaoding client configs after interval=#{interval}" now = Time.now if(now - @last_reload_time) >= interval @cluster_map.update_config(config) @last_reload_time = now end end |
#start ⇒ Object
Starts up all the connections to the bus. Optionally takes a block to which it yields self. When the block is passed, it will auto close the connections after the block finishes.
82 83 84 85 86 87 88 89 |
# File 'lib/messagebus/client.rb', line 82 def start if @config.enable_auto_init_connections logger.info "auto enable connections set, starting clusters." @cluster_map.start else logger.info "Config['enable_auto_init_connections'] is false, will not start any messagebus producers." end end |
#stop ⇒ Object
91 92 93 |
# File 'lib/messagebus/client.rb', line 91 def stop cluster_map.stop end |