Class: Splash::Transports::Rabbitmq::Client

Inherits:
Object
  • Object
show all
Includes:
Config, Loggers, Splash::Transports
Defined in:
lib/splash/transports/rabbitmq.rb

Constant Summary

Constants included from Constants

Constants::AUTHOR, Constants::BACKENDS_STRUCT, Constants::CONFIG_FILE, Constants::COPYRIGHT, Constants::DAEMON_LOGMON_SCHEDULING, Constants::DAEMON_PID_FILE, Constants::DAEMON_PID_PATH, Constants::DAEMON_PROCESS_NAME, Constants::DAEMON_STDERR_TRACE, Constants::DAEMON_STDOUT_TRACE, Constants::EMAIL, Constants::EXECUTION_TEMPLATE, Constants::EXECUTION_TEMPLATE_TOKENS_LIST, Constants::LICENSE, Constants::LOGGERS_STRUCT, Constants::PROMETHEUS_PUSHGATEWAY_HOST, Constants::PROMETHEUS_PUSHGATEWAY_PORT, Constants::TRACE_PATH, Constants::TRANSPORTS_STRUCT, Constants::VERSION

Instance Method Summary collapse

Methods included from Loggers

#change_logger, #get_logger

Methods included from Config

#get_config

Methods included from ConfigUtilities

#checkconfig, #setupsplash

Methods included from Helpers

#daemonize, #get_process, #group_root, #install_file, #is_root?, #make_folder, #make_link, #run_as_root, #search_file_in_gem, #user_root, #verify_file, #verify_folder, #verify_link, #verify_service

Methods included from Splash::Transports

#get_default_client, #get_default_subscriber

Constructor Details

#initializeClient

Returns a new instance of Client.



41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/splash/transports/rabbitmq.rb', line 41

def initialize
  @config = get_config.transports
  host = @config[:rabbitmq][:host]
  port = @config[:rabbitmq][:port]
  vhost = (@config[:rabbitmq][:vhost])? @config[:rabbitmq][:vhost] : '/'
  passwd = (@config[:rabbitmq][:passwd])? @config[:rabbitmq][:passwd] : 'guest'
  user = (@config[:rabbitmq][:user])? @config[:rabbitmq][:user] : 'guest'
  conf  = { :host => host, :vhost => vhost, :user => user, :password => passwd, :port => port.to_i}

  begin
    @connection = Bunny.new conf 
    @connection.start
    @channel = @connection.create_channel
  rescue Bunny::Exception
    splash_exit  case: :service_dependence_missing, more: "RabbitMQ Transport not available."
  end
end

Instance Method Details

#ack(ack) ⇒ Object



68
69
70
# File 'lib/splash/transports/rabbitmq.rb', line 68

def ack(ack)
  return @channel.acknowledge(ack, false)
end

#closeObject



97
98
99
# File 'lib/splash/transports/rabbitmq.rb', line 97

def close
  @connection.close
end

#execute(order) ⇒ Object



72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/splash/transports/rabbitmq.rb', line 72

def execute(order)
  queue = order[:return_to]
  lock = Mutex.new
  res = nil
  condition = ConditionVariable.new
  get_default_subscriber(queue: queue).subscribe do |delivery_info, properties, payload|
    res = YAML::load(payload)

    lock.synchronize { condition.signal }
  end
  get_logger.send "Verb : #{order[:verb].to_s} to queue : #{order[:queue]}."
  get_default_client.publish queue: order[:queue], message: order.to_yaml
  lock.synchronize { condition.wait(lock) }
  return res
end

#get(options = {}) ⇒ Object



88
89
90
91
92
93
94
95
# File 'lib/splash/transports/rabbitmq.rb', line 88

def get(options ={})
  queue = @channel.queue(options[:queue])
  opt = {}; opt[:manual_ack] = (options[:manual_ack])? true : false
  delivery_info, properties, payload = queue.pop
  res = {:message => payload}
  res[:ack] = delivery_info.delivery_tag if options[:manual_ack]
  return res
end

#publish(options = {}) ⇒ Object



64
65
66
# File 'lib/splash/transports/rabbitmq.rb', line 64

def publish(options ={})
  return @channel.default_exchange.publish(options[:message], :routing_key => options[:queue])
end

#purge(options) ⇒ Object



60
61
62
# File 'lib/splash/transports/rabbitmq.rb', line 60

def purge(options)
  @channel.queue(options[:queue]).purge
end