Class: Messagebus::Connection

Inherits:
Object
  • Object
show all
Includes:
Validations
Defined in:
lib/messagebus/connection.rb

Direct Known Subclasses

Consumer, Producer

Constant Summary collapse

STARTED =
"STARTED"
STOPPED =
"STOPPED"

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Validations

#valid_host?, #validate_connection_config, #validate_destination_config

Constructor Details

#initialize(host_params, passed_options = {}) ⇒ Connection

Returns a new instance of Connection.



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/messagebus/connection.rb', line 44

def initialize(host_params, passed_options = {})
  @host_params = host_params
  @host_params = [@host_params] unless @host_params.is_a?(Array)

  @options = DottableHash.new({
    :user => '', :passwd => '',
    :conn_lifetime_sec => 300, :receipt_wait_timeout_ms => 5000,
    :destination_name => nil, :destination_type => nil,
    :ack_type => Messagebus::ACK_TYPE_AUTO_CLIENT, :num_threads_per_server => 1,
    :enable_dynamic_serverlist_fetch => false, :dynamic_fetch_timeout_ms => 1000,
    :dynamic_serverlist_fetch_url_override => nil
  }).merge(passed_options)

  @state = STOPPED
end

Instance Attribute Details

#host_paramsObject

Returns the value of attribute host_params.



40
41
42
# File 'lib/messagebus/connection.rb', line 40

def host_params
  @host_params
end

#optionsObject

Returns the value of attribute options.



40
41
42
# File 'lib/messagebus/connection.rb', line 40

def options
  @options
end

Instance Method Details

#do_with_timeout(timeout_ms) ⇒ Object



68
69
70
71
72
73
74
75
76
77
# File 'lib/messagebus/connection.rb', line 68

def do_with_timeout(timeout_ms)
  if not block_given?
    raise "do_with_timeout expects a block to be run"
  end

  start_time = Time.now
  while (Time.now - start_time) * 1000 < timeout_ms
    yield
  end
end

#start_server(host_params, user, passwd, subscription_id = nil) ⇒ Object



79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/messagebus/connection.rb', line 79

def start_server(host_params, user, passwd, subscription_id=nil)
  case host_params
  when Array
    host_param = host_params[rand(host_params.length)]
  when String
    host_param = host_params
  end

  host, port = host_param.split(':')

  connect_headers = {}
  connect_headers.merge!("client-id" => subscription_id) if subscription_id

  stomp = Stomp::Client.new(user, passwd, host, port, logger,  connect_headers)
  logger.info "Started client for host_param:#{host_param} stomp-client:#{stomp} user:#{user}"
  @state = STARTED

  return stomp
end

#started?Boolean

Returns:

  • (Boolean)


60
61
62
# File 'lib/messagebus/connection.rb', line 60

def started?
  @state == STARTED
end

#stop_server(stomp) ⇒ Object



99
100
101
102
103
# File 'lib/messagebus/connection.rb', line 99

def stop_server(stomp)
  Client.logger.info "Stopping stomp-client:#{stomp}"
  stomp.close if stomp
  @state = STOPPED
end

#stopped?Boolean

Returns:

  • (Boolean)


64
65
66
# File 'lib/messagebus/connection.rb', line 64

def stopped?
  @state == STOPPED
end