Class: Splash::Transports::Rabbitmq::Client
Constant Summary
Constants included
from Constants
Constants::AUTHOR, Constants::BACKENDS_STRUCT, Constants::CONFIG_FILE, Constants::COPYRIGHT, Constants::DAEMON_LOGMON_SCHEDULING, Constants::DAEMON_PID_FILE, Constants::DAEMON_PID_PATH, Constants::DAEMON_PROCESS_NAME, Constants::DAEMON_STDERR_TRACE, Constants::DAEMON_STDOUT_TRACE, Constants::EMAIL, Constants::EXECUTION_TEMPLATE, Constants::EXECUTION_TEMPLATE_TOKENS_LIST, Constants::LICENSE, Constants::LOGGERS_STRUCT, Constants::PROMETHEUS_PUSHGATEWAY_HOST, Constants::PROMETHEUS_PUSHGATEWAY_PORT, Constants::TRACE_PATH, Constants::TRANSPORTS_STRUCT, Constants::VERSION
Instance Method Summary
collapse
Methods included from Loggers
#change_logger, #get_logger
Methods included from Config
#get_config
#checkconfig, #setupsplash
Methods included from Helpers
#daemonize, #get_process, #group_root, #install_file, #is_root?, #make_folder, #make_link, #run_as_root, #search_file_in_gem, #user_root, #verify_file, #verify_folder, #verify_link, #verify_service
#get_default_client, #get_default_subscriber
Constructor Details
#initialize ⇒ Client
Returns a new instance of Client.
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
|
# File 'lib/splash/transports/rabbitmq.rb', line 41
def initialize
@config = get_config.transports
host = @config[:rabbitmq][:host]
port = @config[:rabbitmq][:port]
vhost = (@config[:rabbitmq][:vhost])? @config[:rabbitmq][:vhost] : '/'
passwd = (@config[:rabbitmq][:passwd])? @config[:rabbitmq][:passwd] : 'guest'
user = (@config[:rabbitmq][:user])? @config[:rabbitmq][:user] : 'guest'
conf = { :host => host, :vhost => vhost, :user => user, :password => passwd, :port => port.to_i}
begin
@connection = Bunny.new conf
@connection.start
@channel = @connection.create_channel
rescue Bunny::Exception
splash_exit case: :service_dependence_missing, more: "RabbitMQ Transport not available."
end
end
|
Instance Method Details
#ack(ack) ⇒ Object
68
69
70
|
# File 'lib/splash/transports/rabbitmq.rb', line 68
def ack(ack)
return @channel.acknowledge(ack, false)
end
|
#close ⇒ Object
97
98
99
|
# File 'lib/splash/transports/rabbitmq.rb', line 97
def close
@connection.close
end
|
#execute(order) ⇒ Object
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
|
# File 'lib/splash/transports/rabbitmq.rb', line 72
def execute(order)
queue = order[:return_to]
lock = Mutex.new
res = nil
condition = ConditionVariable.new
get_default_subscriber(queue: queue).subscribe do |delivery_info, properties, payload|
res = YAML::load(payload)
lock.synchronize { condition.signal }
end
get_logger.send "Verb : #{order[:verb].to_s} to queue : #{order[:queue]}."
get_default_client.publish queue: order[:queue], message: order.to_yaml
lock.synchronize { condition.wait(lock) }
return res
end
|
#get(options = {}) ⇒ Object
88
89
90
91
92
93
94
95
|
# File 'lib/splash/transports/rabbitmq.rb', line 88
def get(options ={})
queue = @channel.queue(options[:queue])
opt = {}; opt[:manual_ack] = (options[:manual_ack])? true : false
delivery_info, properties, payload = queue.pop
res = {:message => payload}
res[:ack] = delivery_info.delivery_tag if options[:manual_ack]
return res
end
|
#publish(options = {}) ⇒ Object
64
65
66
|
# File 'lib/splash/transports/rabbitmq.rb', line 64
def publish(options ={})
return @channel.default_exchange.publish(options[:message], :routing_key => options[:queue])
end
|
#purge(options) ⇒ Object
60
61
62
|
# File 'lib/splash/transports/rabbitmq.rb', line 60
def purge(options)
@channel.queue(options[:queue]).purge
end
|