Class: CakeDB

Inherits:
Object
  • Object
show all
Defined in:
lib/cakedb.rb

Instance Method Summary collapse

Constructor Details

#initialize(server = "127.0.0.1", port = 8888, lvl = 0) ⇒ CakeDB

Returns a new instance of CakeDB.



6
7
8
9
10
11
12
# File 'lib/cakedb.rb', line 6

def initialize( server="127.0.0.1", port=8888, lvl=0 )
  #puts "Connection Initiated"
  @server = TCPSocket.new(server, port)
  @sids = {}
  @loggingLevel = lvl
  @timeout = 30
end

Instance Method Details

#allSince(stream, ts) ⇒ Object



103
104
105
106
# File 'lib/cakedb.rb', line 103

def allSince(stream, ts)
  #allSince is mode 4
  return read(stream, ts, nil, 4)
end

#data_in(length) ⇒ Object



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/cakedb.rb', line 43

def data_in(length)
  data = nil
  got = 0
  timeOver = Time.now + @timeout
  while got < length
    data = @server.recv(length-got)
    got += data.bytesize
    if Time.now > timeOver
      raise "Timeout on incomplete read, returning what I had"
	if @loggingLevel == 1
        puts "Timeout Hit. GT #{timeOver}"
	end
      return data
    end
    if @loggingLevel == 2 && got < length
      puts "Incomplete read on payload receive"
      puts "Raw Data: #{data}"
      puts "Length: #{length} bytes"
      puts "Raw Data length: #{data.bytesize}"
    end
  end
  return data
end

#get_sid(stream) ⇒ Object



13
14
15
16
17
18
19
20
21
22
23
24
# File 'lib/cakedb.rb', line 13

def get_sid(stream)
  if @sids[stream].nil?
    @server.write [stream.bytesize,5].pack("L>S>")  
    @server.write stream  
    @server.flush
    sid = @server.recv(2).unpack("S>")
    @sids[stream] = sid[0]
    return sid[0]
  else
    return @sids[stream]
  end
end

#get_tasking(msg) ⇒ Object



34
35
36
37
38
39
40
41
# File 'lib/cakedb.rb', line 34

def get_tasking(msg)
  details = Hash.new
  details["database"] = msg["region"]
  details["task"]     = msg["tasking"]
  details["date"]     = msg["date"]
  details["rediskey"] = details["database"] + "-" + details["task"]
  return details
end

#lastAt(stream, ts) ⇒ Object



108
109
110
111
# File 'lib/cakedb.rb', line 108

def lastAt(stream, ts)
  #lastAt is mode 6
  return read(stream, ts, nil, 6)
end

#rangeQuery(stream, ts, tsend) ⇒ Object

Perform a range query



113
114
115
# File 'lib/cakedb.rb', line 113

def rangeQuery(stream, ts, tsend)
  return read(stream, ts, tsend, 3)
end

#read(stream, ts, tsend = nil, mode = 4) ⇒ Object



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/cakedb.rb', line 67

def read(stream, ts, tsend=nil, mode=4)
  #Issue the query
  sid = get_sid(stream)
  if tsend 
    @server.write [18,mode,sid,ts,tsend].pack("L>S>S>Q>Q>")
  else
    @server.write [10,mode,sid,ts].pack("L>S>S>Q>")
  end
  #@server.write [headersz,mode,sid,ts].pack("L>S>S>Q>") 
  @server.flush
  result = Array.new

  #How much data we gotta get?
  recv_total = data_in(4).unpack("L>")[0]
  #puts "total is #{recv_total}"
  if recv_total > 0
    counter = 0
    #Recive the result
    while recv_total > 0
	#Get the header - 12 bytes
	header_whole = data_in(12)
	header = header_whole.unpack("Q>L>")

	#recieve the data, header says how much
	result[counter] = Hash.new
	result[counter]["ts"] = header[0]
      result[counter]["data"] = @server.recv(header[1])
	recv_total = recv_total - header[1] - 12
	counter+=1
    end
  else
    # I SEE NOTHING!
  end
  return result
end

#write(stream, payload) ⇒ Object



26
27
28
29
30
31
32
# File 'lib/cakedb.rb', line 26

def write(stream,payload)
  sid = get_sid(stream)
  #puts sid
  @server.write [(payload.bytesize + 2),2,sid].pack("L>S>S>")
  @server.write payload
  @server.flush
end