Class: CakeDB

Inherits:
Object
  • Object
show all
Defined in:
lib/cakedb.rb

Instance Method Summary collapse

Constructor Details

#initialize(server = "127.0.0.1", port = 8888, lvl = 0) ⇒ CakeDB

Returns a new instance of CakeDB.



6
7
8
9
10
11
12
13
14
# File 'lib/cakedb.rb', line 6

def initialize( server="127.0.0.1", port=8888, lvl=0 )
  #puts "Connection Initiated"
  @server = TCPSocket.new(server, port)
  @s = server
  @p = port
  @sids = {}
  @loggingLevel = lvl
  @timeout = 30
end

Instance Method Details

#allSince(stream, ts) ⇒ Object



108
109
110
111
# File 'lib/cakedb.rb', line 108

def allSince(stream, ts)
  #allSince is mode 4
  return read(stream, ts, nil, 4)
end

#data_in(length) ⇒ Object



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/cakedb.rb', line 45

def data_in(length)
  data = @server.recv(length)
  got = data.bytesize
  timeOver = Time.now + @timeout
  while got < length
    data += @server.recv(length-got)
    got = data.bytesize
    if Time.now > timeOver
	if @loggingLevel == 1
        puts "Timeout Hit. GT #{timeOver}"
	end
	@server.close()
	@sids = {}
	@server = TCPSocket.new(@s, @p)
	raise "Timeout on incomplete read, returning what I had"
      #return data
    end
    if @loggingLevel == 2 && got < length
      puts "Incomplete read on payload receive"
      puts "Raw Data: #{data}"
      puts "Length: #{length} bytes"
      puts "Raw Data length: #{data.bytesize}"
    end
  end
  return data
end

#get_sid(stream) ⇒ Object



15
16
17
18
19
20
21
22
23
24
25
26
# File 'lib/cakedb.rb', line 15

def get_sid(stream)
  if @sids[stream].nil?
    @server.write [stream.bytesize,5].pack("L>S>")  
    @server.write stream  
    @server.flush
    sid = data_in(2).unpack("S>")
    @sids[stream] = sid[0]
    return sid[0]
  else
    return @sids[stream]
  end
end

#get_tasking(msg) ⇒ Object



36
37
38
39
40
41
42
43
# File 'lib/cakedb.rb', line 36

def get_tasking(msg)
  details = Hash.new
  details["database"] = msg["region"]
  details["task"]     = msg["tasking"]
  details["date"]     = msg["date"]
  details["rediskey"] = details["database"] + "-" + details["task"]
  return details
end

#lastAt(stream, ts) ⇒ Object



113
114
115
116
# File 'lib/cakedb.rb', line 113

def lastAt(stream, ts)
  #lastAt is mode 6
  return read(stream, ts, nil, 6)
end

#rangeQuery(stream, ts, tsend) ⇒ Object

Perform a range query



118
119
120
# File 'lib/cakedb.rb', line 118

def rangeQuery(stream, ts, tsend)
  return read(stream, ts, tsend, 3)
end

#read(stream, ts, tsend = nil, mode = 4) ⇒ Object



72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/cakedb.rb', line 72

def read(stream, ts, tsend=nil, mode=4)
  #Issue the query
  sid = get_sid(stream)
  if tsend 
    @server.write [18,mode,sid,ts,tsend].pack("L>S>S>Q>Q>")
  else
    @server.write [10,mode,sid,ts].pack("L>S>S>Q>")
  end
  #@server.write [headersz,mode,sid,ts].pack("L>S>S>Q>") 
  @server.flush
  result = Array.new

  #How much data we gotta get?
  recv_total = data_in(4).unpack("L>")[0]
  #puts "total is #{recv_total}"
  if recv_total > 0
    counter = 0
    #Recive the result
    while recv_total > 0
	#Get the header - 12 bytes
	header_whole = data_in(12)
	header = header_whole.unpack("Q>L>")

	#recieve the data, header says how much
	result[counter] = Hash.new
	result[counter]["ts"] = header[0]
      result[counter]["data"] = data_in(header[1])
	recv_total = recv_total - header[1] - 12
	counter+=1
    end
  else
    # I SEE NOTHING!
  end
  return result
end

#write(stream, payload) ⇒ Object



28
29
30
31
32
33
34
# File 'lib/cakedb.rb', line 28

def write(stream,payload)
  sid = get_sid(stream)
  #puts sid
  @server.write [(payload.bytesize + 2),2,sid].pack("L>S>S>")
  @server.write payload
  @server.flush
end