Class: Cheezmiz::Broker

Inherits:
Object
  • Object
show all
Defined in:
lib/broker.rb

Constant Summary collapse

STX =
"\x02"
ETX =
"\x03"
MIN_THREADS =
5
MAX_THREADS =
15

Instance Method Summary collapse

Constructor Details

#initialize(params = {}) ⇒ Broker

Returns a new instance of Broker.



31
32
33
34
35
36
37
38
# File 'lib/broker.rb', line 31

def initialize(params = {})
  @sequence_number = 1
  @callbacks = Hash.new { |hash, key| hash[key] = [] }
  @pool = ActionPool::Pool.new(
    :min_threads => (params[:min_threads] || MIN_THREADS),
    :max_threads => (params[:max_threads] || MAX_THREADS)
  )
end

Instance Method Details

#callbacksObject



44
45
46
# File 'lib/broker.rb', line 44

def callbacks
  @callbacks
end

#connect(params = {}) ⇒ Object



55
56
57
58
59
60
61
# File 'lib/broker.rb', line 55

def connect(params = {})
  @socket = params[:socket] || begin
    params[:host] ||= 'ctp-a.chikka.com'
    params[:port] ||= 6301
    TCPSocket.open(params[:host], params[:port])
  end
end

#register_callback(operation, options = {:threaded => true}, &proc) ⇒ Object



40
41
42
# File 'lib/broker.rb', line 40

def register_callback(operation, options = {:threaded => true}, &proc)
  @callbacks[operation] << options.merge({ :callback => proc })
end

#send(message, params = {}) ⇒ Object



48
49
50
51
52
53
# File 'lib/broker.rb', line 48

def send(message, params = {})
  raw = encode({ :message => message }.merge(params))
  result = @socket.send(raw, 0)
  process_callbacks(@callbacks[:outgoing], message)
  return raw, result
end

#startObject



63
64
65
66
67
# File 'lib/broker.rb', line 63

def start
  @pool.process do
    @socket.while_reading { |buffer| process_messages(buffer) }
  end
end