Class: Chef::Queue

Inherits:
Object
  • Object
show all
Extended by:
Mixin::ParamsValidate
Defined in:
lib/chef/queue.rb

Class Method Summary collapse

Methods included from Mixin::ParamsValidate

set_or_return, validate

Class Method Details

.connectObject



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/chef/queue.rb', line 33

def connect
  queue_user     = Chef::Config[:queue_user]
  queue_password = Chef::Config[:queue_password]
  queue_host = Chef::Config[:queue_host]
  queue_port = Chef::Config[:queue_port]
  queue_retries = 1 unless queue_retries

  # Connection.open(login = "", passcode = "", host='localhost', port=61613, reliable=FALSE, reconnectDelay=5)
  @client = Stomp::Connection.open(queue_user, queue_password, queue_host, queue_port, false)

rescue Errno::ECONNREFUSED
  Chef::Log.error("Connection refused connecting to stomp queue at #{queue_host}:#{queue_port}, retry #{queue_retries}/#{@queue_retry_count}")
  sleep(@queue_retry_delay)
  retry if (queue_retries += 1) < @queue_retry_count
  raise Errno::ECONNREFUSED, "Connection refused connecting to stomp queue at #{queue_host}:#{queue_port}, giving up"
rescue Timeout::Error
  Chef::Log.error("Timeout connecting to stomp queue at #{queue_host}:#{queue_port}, retry #{queue_retries}/#{@queue_retry_count}")
  sleep(@queue_retry_delay)
  retry if (queue_retries += 1) < @queue_retry_count
  raise Timeout::Error, "Timeout connecting to stomp queue at #{queue_host}:#{queue_port}, giving up"
else
  queue_retries = 1 # reset the number of retries on success
end

.disconnectObject

Raises:

  • (ArgumentError)


139
140
141
142
# File 'lib/chef/queue.rb', line 139

def disconnect
  raise ArgumentError, "You must call connect before you can disconnect!" unless @client
  @client.disconnect
end

.make_url(type, name) ⇒ Object



57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/chef/queue.rb', line 57

def make_url(type, name)
  validate(
    {
      :queue_type => type.to_sym,
      :queue_name => name.to_sym,
    },
    {
      :queue_type => {
        :equal_to => [ :topic, :queue ],
      },
      :queue_name => {
        :kind_of => [ String, Symbol ],
      }
    }
  )
  if Chef::Config[:queue_prefix]
   queue_prefix = Chef::Config[:queue_prefix]
    queue_url = "/#{type}/#{queue_prefix}/chef/#{name}"
 else
   queue_url = "/#{type}/chef/#{name}"
 end
 queue_url
end

.poll_msgObject



129
130
131
132
133
134
135
136
137
# File 'lib/chef/queue.rb', line 129

def poll_msg
  connect if @client == nil
  raw_msg = @client.poll()
  if raw_msg
    msg = JSON.parse(raw_msg.body)
  else
    nil
  end
end

.receive_msgObject



116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/chef/queue.rb', line 116

def receive_msg
  connect if @client == nil
  begin
    raw_msg = @client.receive()
    Chef::Log.debug("Received Message from #{raw_msg.headers["destination"]} containing: #{raw_msg.body}")
  rescue
    Chef::Log.debug("Received nil message from stomp, retrying")
    retry
  end
  msg = JSON.parse(raw_msg.body)
  return msg, raw_msg.headers
end

.send_msg(type, name, msg) ⇒ Object



88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/chef/queue.rb', line 88

def send_msg(type, name, msg)
  queue_retries = 1 unless queue_retries
  validate(
    {
      :message => msg,
    },
    {
      :message => {
        :respond_to => :to_json
      }
    }
  )
  queue_url = make_url(type, name)
  json = msg.to_json
  connect if @client == nil
  Chef::Log.debug("Sending to #{queue_url}: #{json}")
  begin
    @client.send(queue_url, json)
  rescue Errno::EPIPE
    Chef::Log.debug("Lost connection to stomp queue, reconnecting")
    connect
    retry if (queue_retries += 1) < @queue_retry_count
    raise Errno::EPIPE, "Lost connection to stomp queue, giving up"
  else
    queue_retries = 1 # reset the number of retries on success
  end
end

.subscribe(type, name) ⇒ Object



81
82
83
84
85
86
# File 'lib/chef/queue.rb', line 81

def subscribe(type, name)
  queue_url = make_url(type, name)
  Chef::Log.debug("Subscribing to #{queue_url}")
  connect if @client == nil
  @client.subscribe(queue_url)
end