Class: Messagebus::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/messagebus/client.rb

Defined Under Namespace

Classes: InitializationError, InvalidDestinationError

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config_hash) ⇒ Client

Returns a new instance of Client.



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/messagebus/client.rb', line 56

def initialize(config_hash)

  @@logger = if provided_logger = config_hash.delete(:logger)
    provided_logger
  elsif log_file = config_hash["log_file"]
    Logger.new(log_file)
  else
    Logger.new(Messagebus::LOG_DEFAULT_FILE)
  end

  # This is required to do a deep clone of config hash object
  @config = DottableHash.new(Marshal.load(Marshal.dump(config_hash)))
  @config.merge!(@config.cluster_defaults) if @config.cluster_defaults
  @@logger.level = Logger::Severity.const_get(@config.log_level.upcase) if @config.log_level

  @enable_client_logger_thread_debugging = config.enable_client_logger_thread_debugging

  logger.debug "Initializing Messagebus client."
  @cluster_map = ClusterMap.new(@config)
  #added for reloading config on interval
  @last_reload_time = Time.now
end

Instance Attribute Details

#cluster_mapObject (readonly)

Returns the value of attribute cluster_map.



40
41
42
# File 'lib/messagebus/client.rb', line 40

def cluster_map
  @cluster_map
end

#configObject (readonly)

Returns the value of attribute config.



40
41
42
# File 'lib/messagebus/client.rb', line 40

def config
  @config
end

#last_reload_timeObject (readonly)

Returns the value of attribute last_reload_time.



40
41
42
# File 'lib/messagebus/client.rb', line 40

def last_reload_time
  @last_reload_time
end

#loggerObject (readonly)

Returns the value of attribute logger.



40
41
42
# File 'lib/messagebus/client.rb', line 40

def logger
  @logger
end

Class Method Details

.loggerObject



161
162
163
# File 'lib/messagebus/client.rb', line 161

def logger
  @@logger ||= Logger.new(Messagebus::LOG_DEFAULT_FILE)
end

.start(config_hash) ⇒ Object



42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/messagebus/client.rb', line 42

def self.start(config_hash)
  client = new(config_hash)
  client.start

  if block_given?
    begin
      yield client
    ensure
      client.stop
    end
  end
  client
end

Instance Method Details

#headers(delay) ⇒ Object



149
150
151
152
153
154
155
156
157
158
# File 'lib/messagebus/client.rb', line 149

def headers(delay)
  headers = {}
  unless delay == 0
    schedule_time = (Time.now.to_i * 1000 + delay).to_s
    headers.merge!({
      Messagebus::Producer::SCHEDULED_DELIVERY_TIME_MS_HEADER => schedule_time
    })
  end
  headers
end

#publish(destination_name, object, delay_ms = 0, safe = true, binary = false, headers = {}) ⇒ Object



99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/messagebus/client.rb', line 99

def publish(destination_name, object, delay_ms = 0, safe = true, binary = false, headers = {})

  if !(@config.enable_auto_init_connections)
    logger.warn "Config['enable_auto_init_connections'] is false, not publishing destination_name=#{destination_name}, message_contents=#{object.inspect}"
    false
  else
    producer = cluster_map.find(destination_name)
    if producer.nil?
      logger.error "Not publishing due to unconfigured destionation name. destination_name=#{destination_name}, message=#{object.inspect}"
      raise InvalidDestinationError, "Destination #{destination_name} not found"
    end

    if binary
      message = Messagebus::Message.create(object, nil, binary)
    else
      message = Messagebus::Message.create(object)
    end

    logger.info "Publishing to destination_name=#{destination_name}, message_id=#{message.message_id}, message_contents=#{object.inspect}"

    begin
      publish_result = nil
      duration = Benchmark.realtime do
        publish_result = producer.publish(destination_name, message, headers(delay_ms).merge(headers), safe)
      end
      duration = (duration * 1_000).round

      if publish_result
        logger.info "Message publishing to #{destination_name} took #{duration} result=success destination_name=#{destination_name}, message_id=#{message.message_id}, duration=#{duration}ms"
        true
      else
        logger.error "Failed to publish message result=fail destination_name=#{destination_name}, message_id=#{message.message_id}, duration=#{duration}ms, message_contents=#{object.inspect}"
        false
      end
    rescue => e
      logger.error "Failed to publish message result=error destination_name=#{destination_name}, message_id=#{message.message_id}, duration=#{duration}ms, message_contents=#{object.inspect}, error=#{e.inspect}, backtrace=#{e.backtrace.join("|")}"
      false
    end
  end
end

#reload_config_on_interval(config, interval = 300) ⇒ Object



140
141
142
143
144
145
146
147
# File 'lib/messagebus/client.rb', line 140

def reload_config_on_interval(config, interval = 300)
  logger.info "Relaoding client configs after interval=#{interval}"
  now = Time.now
  if(now - @last_reload_time) >= interval
    @cluster_map.update_config(config)
    @last_reload_time = now
  end
end

#startObject

Starts up all the connections to the bus. Optionally takes a block to which it yields self. When the block is passed, it will auto close the connections after the block finishes.



82
83
84
85
86
87
88
89
# File 'lib/messagebus/client.rb', line 82

def start
  if @config.enable_auto_init_connections
    logger.info "auto enable connections set, starting clusters."
    @cluster_map.start
  else
    logger.info "Config['enable_auto_init_connections'] is false, will not start any messagebus producers."
  end
end

#stopObject



91
92
93
# File 'lib/messagebus/client.rb', line 91

def stop
  cluster_map.stop
end