Class: AMQP::Channel

Inherits:
Object
  • Object
show all
Defined in:
lib/jessica/amqp_client_connector.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(connection = nil, channel_id = 1, opts = {}) ⇒ Channel

Returns a new instance of Channel.



55
56
57
58
# File 'lib/jessica/amqp_client_connector.rb', line 55

def initialize(connection=nil,channel_id=1,opts={})
  @options = opts
  @connection = connection || AMQP::Connection.new(@options)
end

Instance Attribute Details

#connectionObject (readonly)

Returns the value of attribute connection.



54
55
56
# File 'lib/jessica/amqp_client_connector.rb', line 54

def connection
  @connection
end

#optionsObject (readonly)

Returns the value of attribute options.



54
55
56
# File 'lib/jessica/amqp_client_connector.rb', line 54

def options
  @options
end

Instance Method Details

#channel_numberObject



60
61
62
# File 'lib/jessica/amqp_client_connector.rb', line 60

def channel_number
  @connection.channel.channel_number
end

#connectObject



68
69
70
71
# File 'lib/jessica/amqp_client_connector.rb', line 68

def connect
  return if connected?
  @connection.connect
end

#connected?Boolean

Returns:

  • (Boolean)


64
65
66
# File 'lib/jessica/amqp_client_connector.rb', line 64

def connected?
  @connection.connected?
end

#direct(name = 'amq.direct', opts = {}, &block) ⇒ Object



81
82
83
84
85
86
87
88
89
# File 'lib/jessica/amqp_client_connector.rb', line 81

def direct(name = 'amq.direct', opts = {}, &block)
  if exchange = @connection.find_exchange(name)
    raise AMQP::IncompatibleOptionsError.new() unless exchange.match_opts(opts)
  else
    exchange = @connection.exchange(name, 'direct', opts)
  end
  block.call(exchange) if block
  exchange
end

#disconnectObject



73
74
75
# File 'lib/jessica/amqp_client_connector.rb', line 73

def disconnect
  @connection.disconnect
end

#fanout(name = 'amq.fanout', opts = {}, &block) ⇒ Object



91
92
93
94
95
96
97
98
99
# File 'lib/jessica/amqp_client_connector.rb', line 91

def fanout(name = 'amq.fanout', opts = {}, &block)
  if exchange = @connection.find_exchange(name)
    raise AMQP::IncompatibleOptionsError.new() unless exchange.match_opts(opts)
  else
    exchange = @connection.exchange(name, 'fanout', opts)
  end
  block.call(exchange) if block
  exchange
end

#headers(name = 'amq.headers', opts = {}, &block) ⇒ Object



101
102
103
104
105
106
107
108
109
# File 'lib/jessica/amqp_client_connector.rb', line 101

def headers(name = 'amq.headers', opts = {}, &block)
  if exchange = @connection.find_exchange(name)
    raise AMQP::IncompatibleOptionsError.new() unless exchange.match_opts(opts)
  else
    exchange = @connection.exchange(name, 'headers', opts)
  end
  block.call(exchange) if block
  exchange
end

#prefetch(count, global = false) ⇒ Object



77
78
79
# File 'lib/jessica/amqp_client_connector.rb', line 77

def prefetch(count, global = false)
  @connection.channel.basic_qos(0, count, global)
end

#queue(name = nil, opts = {}, &block) ⇒ Object

Raises:

  • (ArgumentError)


121
122
123
124
125
126
127
128
129
130
131
# File 'lib/jessica/amqp_client_connector.rb', line 121

def queue(name=nil, opts={}, &block)
  raise ArgumentError.new("queue name must not be nil") if name.nil?
  raise ArgumentError.new("queue name must not be empty") if name.empty?
  if queue = @connection.find_queue(name)
    raise AMQP::IncompatibleOptionsError.new() unless queue.match_opts(opts)
  else
    queue = @connection.queue(name, opts)
  end
  block.call(queue) if block
  queue
end

#topic(name = 'amq.topic', opts = {}, &block) ⇒ Object



111
112
113
114
115
116
117
118
119
# File 'lib/jessica/amqp_client_connector.rb', line 111

def topic(name = 'amq.topic', opts = {}, &block)
  if exchange = @connection.find_exchange(name)
    raise AMQP::IncompatibleOptionsError.new() unless exchange.match_opts(opts)
  else
    exchange = @connection.exchange(name, 'topic', opts)
  end
  block.call(exchange) if block
  exchange
end