Class: Fog::AWS::Kinesis::Mock

Inherits:
Object
  • Object
show all
Defined in:
lib/fog/aws/kinesis.rb,
lib/fog/aws/requests/kinesis/put_record.rb,
lib/fog/aws/requests/kinesis/get_records.rb,
lib/fog/aws/requests/kinesis/put_records.rb,
lib/fog/aws/requests/kinesis/split_shard.rb,
lib/fog/aws/requests/kinesis/list_streams.rb,
lib/fog/aws/requests/kinesis/merge_shards.rb,
lib/fog/aws/requests/kinesis/create_stream.rb,
lib/fog/aws/requests/kinesis/delete_stream.rb,
lib/fog/aws/requests/kinesis/describe_stream.rb,
lib/fog/aws/requests/kinesis/add_tags_to_stream.rb,
lib/fog/aws/requests/kinesis/get_shard_iterator.rb,
lib/fog/aws/requests/kinesis/list_tags_for_stream.rb,
lib/fog/aws/requests/kinesis/remove_tags_from_stream.rb

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Mock

Returns a new instance of Mock.



147
148
149
150
151
152
153
# File 'lib/fog/aws/kinesis.rb', line 147

def initialize(options={})
  @account_id        = Fog::AWS::Mock.owner_id
  @aws_access_key_id = options[:aws_access_key_id]
  @region            = options[:region] || 'us-east-1'

  Fog::AWS.validate_region!(@region)
end

Class Method Details

.dataObject



133
134
135
136
137
138
139
140
141
# File 'lib/fog/aws/kinesis.rb', line 133

def self.data
  @data ||= Hash.new do |hash, region|
    hash[region] = Hash.new do |region_hash, key|
      region_hash[key] = {
        :kinesis_streams => {}
      }
    end
  end
end

.mutexObject



128
129
130
# File 'lib/fog/aws/kinesis.rb', line 128

def self.mutex
  @mutex ||= Mutex.new
end

.next_sequence_numberObject



163
164
165
166
167
168
169
# File 'lib/fog/aws/kinesis.rb', line 163

def self.next_sequence_number
  mutex.synchronize do
    @sequence_number ||= -1
    @sequence_number += 1
    @sequence_number.to_s
  end
end

.next_shard_idObject



172
173
174
175
176
177
178
# File 'lib/fog/aws/kinesis.rb', line 172

def self.next_shard_id
  mutex.synchronize do
    @shard_id ||= -1
    @shard_id += 1
    "shardId-#{@shard_id.to_s.rjust(12, "0")}"
  end
end

.resetObject



143
144
145
# File 'lib/fog/aws/kinesis.rb', line 143

def self.reset
  @data = nil
end

Instance Method Details

#add_tags_to_stream(options = {}) ⇒ Object



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/fog/aws/requests/kinesis/add_tags_to_stream.rb', line 30

def add_tags_to_stream(options={})
  stream_name = options.delete("StreamName")
  tags = options.delete("Tags")

  unless stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name }
    raise Fog::AWS::Kinesis::ResourceNotFound.new("Stream #{stream_name} under account #{@account_id} not found.")
  end

  stream["Tags"] = stream["Tags"].merge(tags)

  response = Excon::Response.new
  response.status = 200
  response.body = ""
  response
end

#create_stream(options = {}) ⇒ Object



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/fog/aws/requests/kinesis/create_stream.rb', line 30

def create_stream(options={})
  stream_name = options.delete("StreamName")
  shard_count = options.delete("ShardCount") || 1
  stream_arn = "arn:aws:kinesis:#{@region}:#{@account_id}:stream/#{stream_name}"

  if data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name }
    raise Fog::AWS::Kinesis::ResourceInUse.new("Stream #{stream_name} under account #{@account_id} already exists.")
  end

  shards = (0...shard_count).map do |shard|
    {
      "HashKeyRange"=>{
        "EndingHashKey"=>"340282366920938463463374607431768211455",
        "StartingHashKey"=>"0"
      },
      "SequenceNumberRange"=>{
        "StartingSequenceNumber"=> next_sequence_number
      },
      "ShardId"=>next_shard_id,
      "Records" => []
    }
  end

  data[:kinesis_streams] = [{
      "HasMoreShards" => false,
      "StreamARN" => stream_arn,
      "StreamName" => stream_name,
      "StreamStatus" => "ACTIVE",
      "Shards" => shards,
      "Tags" => {}
    }]

  response = Excon::Response.new
  response.status = 200
  response.body = ""
  response
end

#dataObject



155
156
157
# File 'lib/fog/aws/kinesis.rb', line 155

def data
  self.class.data[@region][@aws_access_key_id]
end

#delete_stream(options = {}) ⇒ Object



28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/fog/aws/requests/kinesis/delete_stream.rb', line 28

def delete_stream(options={})
  stream_name = options.delete("StreamName")

  unless stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name }
    raise Fog::AWS::Kinesis::ResourceNotFound.new("Stream #{stream_name} under account #{@account_id} not found.")
  end

  data[:kinesis_streams].delete(stream)

  response = Excon::Response.new
  response.status = 200
  response.body = ""
  response
end

#describe_stream(options = {}) ⇒ Object



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/fog/aws/requests/kinesis/describe_stream.rb', line 36

def describe_stream(options={})
  stream_name = options.delete("StreamName")

  unless stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name }
    raise Fog::AWS::Kinesis::ResourceNotFound.new("Stream #{stream_name} under account #{@account_id} not found.")
  end

  # Strip Records key out of shards for response
  shards = stream["Shards"].reject{ |k,_| k == "Records" }

  response = Excon::Response.new
  response.status = 200
  response.body = { "StreamDescription" => stream.dup.merge("Shards" => shards) }
  response
end

#get_records(options = {}) ⇒ Object



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/fog/aws/requests/kinesis/get_records.rb', line 32

def get_records(options={})
  shard_iterator = Fog::JSON.decode(options.delete("ShardIterator"))
  limit = options.delete("Limit") || -1
  stream_name = shard_iterator["StreamName"]
  shard_id = shard_iterator["ShardId"]
  starting_sequence_number = (shard_iterator["StartingSequenceNumber"] || 1).to_i

  unless stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name }
    raise Fog::AWS::Kinesis::ResourceNotFound.new("Stream #{stream_name} under account #{@account_id} not found.")
  end

  unless shard = stream["Shards"].detect{ |shard| shard["ShardId"] == shard_id }
    raise Fog::AWS::Kinesis::ResourceNotFound.new("Could not find shard #{shard_id} in stream #{stream_name} under account #{@account_id}.")
  end

  records = []
  shard["Records"].each do |record|
    next if record["SequenceNumber"].to_i < starting_sequence_number
    records << record
    break if records.size == limit
  end

  shard_iterator["StartingSequenceNumber"] = if records.empty?
    starting_sequence_number.to_s
  else
    (records.last["SequenceNumber"].to_i + 1).to_s
  end

  response = Excon::Response.new
  response.status = 200
  response.body = {
    "MillisBehindLatest"=> 0,
    "NextShardIterator"=> Fog::JSON.encode(shard_iterator),
    "Records"=> records
  }
  response
end

#get_shard_iterator(options = {}) ⇒ Object



36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/fog/aws/requests/kinesis/get_shard_iterator.rb', line 36

def get_shard_iterator(options={})
  stream_name = options["StreamName"]

  unless stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name }
    raise Fog::AWS::Kinesis::ResourceNotFound.new("Stream #{stream_name} under account #{@account_id} not found.")
  end

  response = Excon::Response.new
  response.status = 200
  response.body = {
    "ShardIterator" => Fog::JSON.encode(options) # just encode the options that were given, we decode them in get_records
  }
  response
end

#list_streams(options = {}) ⇒ Object



28
29
30
31
32
33
34
35
36
# File 'lib/fog/aws/requests/kinesis/list_streams.rb', line 28

def list_streams(options={})
  response = Excon::Response.new
  response.status = 200
  response.body =           {
    "HasMoreStreams" => false,
    "StreamNames" => data[:kinesis_streams].map{ |stream| stream["StreamName"] }
  }
  response
end

#list_tags_for_stream(options = {}) ⇒ Object



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/fog/aws/requests/kinesis/list_tags_for_stream.rb', line 36

def list_tags_for_stream(options={})
  stream_name = options.delete("StreamName")

  unless stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name }
    raise Fog::AWS::Kinesis::ResourceNotFound.new("Stream #{stream_name} under account #{@account_id} not found.")
  end

  response = Excon::Response.new
  response.status = 200
  response.body = {
    "HasMoreTags" => false,
    "Tags" => stream["Tags"].map{ |k,v|
      {"Key" => k, "Value" => v}
    }

  }
  response
end

#merge_shards(options = {}) ⇒ Object



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/fog/aws/requests/kinesis/merge_shards.rb', line 32

def merge_shards(options={})
  stream_name = options.delete("StreamName")
  shard_to_merge_id = options.delete("ShardToMerge")
  adjacent_shard_to_merge_id = options.delete("AdjacentShardToMerge")

  unless stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name }
    raise Fog::AWS::Kinesis::ResourceNotFound.new("Stream #{stream_name} under account #{@account_id} not found.")
  end

  unless shard_to_merge = stream["Shards"].detect{ |shard| shard["ShardId"] == shard_to_merge_id }
    raise Fog::AWS::Kinesis::ResourceNotFound.new("Could not find shard #{shard_to_merge_id} in stream #{stream_name} under account #{@account_id}.")
  end

  unless adjacent_shard_to_merge = stream["Shards"].detect{ |shard| shard["ShardId"] == adjacent_shard_to_merge_id }
    raise Fog::AWS::Kinesis::ResourceNotFound.new("Could not find shard #{adjacent_shard_to_merge_id} in stream #{stream_name} under account #{@account_id}.")
  end

  # Close shards (set an EndingSequenceNumber on them)
  shard_to_merge["SequenceNumberRange"]["EndingSequenceNumber"] = next_sequence_number
  adjacent_shard_to_merge["SequenceNumberRange"]["EndingSequenceNumber"] = next_sequence_number

  new_starting_hash_key = [
    shard_to_merge["HashKeyRange"]["StartingHashKey"].to_i,
    adjacent_shard_to_merge["HashKeyRange"]["StartingHashKey"].to_i
  ].min.to_s

  new_ending_hash_key = [
    shard_to_merge["HashKeyRange"]["EndingHashKey"].to_i,
    adjacent_shard_to_merge["HashKeyRange"]["EndingHashKey"].to_i
  ].max.to_s

  # create a new shard with ParentShardId and AdjacentParentShardID
  stream["Shards"] << {
    "HashKeyRange"=> {
      "EndingHashKey" => new_ending_hash_key,
      "StartingHashKey" => new_starting_hash_key
    },
    "SequenceNumberRange" => {
      "StartingSequenceNumber" => next_sequence_number
    },
    "ShardId" => next_shard_id,
    "ParentShardId" => shard_to_merge_id,
    "AdjacentParentShardId" => adjacent_shard_to_merge_id
  }

  response = Excon::Response.new
  response.status = 200
  response.body = ""
  response
end

#mutexObject



131
# File 'lib/fog/aws/kinesis.rb', line 131

def mutex; self.class.mutex; end

#next_sequence_numberObject



170
# File 'lib/fog/aws/kinesis.rb', line 170

def next_sequence_number; self.class.next_sequence_number; end

#next_shard_idObject



179
# File 'lib/fog/aws/kinesis.rb', line 179

def next_shard_id; self.class.next_shard_id; end

#put_record(options = {}) ⇒ Object



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/fog/aws/requests/kinesis/put_record.rb', line 38

def put_record(options={})
  stream_name = options.delete("StreamName")

  unless stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name }
    raise Fog::AWS::Kinesis::ResourceNotFound.new("Stream #{stream_name} under account #{@account_id} not found.")
  end

  sequence_number = next_sequence_number
  data = options.delete("Data")
  partition_key = options.delete("PartitionKey")

  sample_method = RUBY_VERSION == "1.8.7" ? :choice : :sample
  shard_id = stream["Shards"].send(sample_method)["ShardId"]
  shard = stream["Shards"].detect{ |shard| shard["ShardId"] == shard_id }
  # store the records on the shard(s)
  shard["Records"] << {
    "SequenceNumber" => sequence_number,
    "Data" => data,
    "PartitionKey" => partition_key
  }

  response = Excon::Response.new
  response.status = 200
  response.body = {
    "SequenceNumber" => sequence_number,
    "ShardId" => shard_id
  }
  response
end

#put_records(options = {}) ⇒ Object



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/fog/aws/requests/kinesis/put_records.rb', line 36

def put_records(options={})
  stream_name = options.delete("StreamName")
  unless stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name }
    raise Fog::AWS::Kinesis::ResourceNotFound.new("Stream #{stream_name} under account #{@account_id} not found.")
  end

  records = options.delete("Records")
  record_results = records.map { |r|
    sequence_number = next_sequence_number

    sample_method = RUBY_VERSION == "1.8.7" ? :choice : :sample
    shard_id = stream["Shards"].send(sample_method)["ShardId"]
    shard = stream["Shards"].detect{ |shard| shard["ShardId"] == shard_id }
    # store the records on the shard(s)
    shard["Records"] << r.merge("SequenceNumber" => sequence_number)
    {
      "SequenceNumber" => sequence_number,
      "ShardId" => shard_id
    }
  }

  response = Excon::Response.new
  response.status = 200
  response.body = {
    "FailedRecordCount" => 0,
    "Records" => record_results
  }
  response
end

#remove_tags_from_stream(options = {}) ⇒ Object



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/fog/aws/requests/kinesis/remove_tags_from_stream.rb', line 30

def remove_tags_from_stream(options={})
  stream_name = options.delete("StreamName")
  tags = options.delete("TagKeys")

  unless stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name }
    raise Fog::AWS::Kinesis::ResourceNotFound.new("Stream #{stream_name} under account #{@account_id} not found.")
  end

  stream["Tags"] = stream["Tags"].delete_if { |k,_| tags.include?(k) }

  response = Excon::Response.new
  response.status = 200
  response.body = ""
  response
end

#reset_dataObject



159
160
161
# File 'lib/fog/aws/kinesis.rb', line 159

def reset_data
  self.class.data[@region].delete(@aws_access_key_id)
end

#split_shard(options = {}) ⇒ Object



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/fog/aws/requests/kinesis/split_shard.rb', line 32

def split_shard(options={})
  stream_name = options.delete("StreamName")
  shard_id = options.delete("ShardToSplit")
  stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name }

  unless stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name }
    raise Fog::AWS::Kinesis::ResourceNotFound.new("Stream #{stream_name} under account #{@account_id} not found.")
  end

  unless shard = stream["Shards"].detect{ |shard| shard["ShardId"] == shard_id }
    raise Fog::AWS::Kinesis::ResourceNotFound.new("Could not find shard #{shard_id} in stream #{stream_name} under account #{@account_id}.")
  end

  # Close original shard (set an EndingSequenceNumber on it)
  shard["SequenceNumberRange"]["EndingSequenceNumber"] = next_sequence_number

  # Calculate new shard ranges
  parent_starting_hash_key = shard["HashKeyRange"]["StartingHashKey"]
  parent_ending_hash_key = shard["HashKeyRange"]["EndingHashKey"]
  new_starting_hash_key = options.delete("NewStartingHashKey")

  # Create two new shards using contiguous hash space based on the original shard
  stream["Shards"] << {
    "HashKeyRange"=> {
      "EndingHashKey" => (new_starting_hash_key.to_i - 1).to_s,
      "StartingHashKey" => parent_starting_hash_key
    },
    "SequenceNumberRange" => {
      "StartingSequenceNumber" => next_sequence_number
    },
    "ShardId" => next_shard_id,
    "ParentShardId" => shard_id
  }
  stream["Shards"] << {
    "HashKeyRange" => {
      "EndingHashKey" => parent_ending_hash_key,
      "StartingHashKey" => new_starting_hash_key
    },
    "SequenceNumberRange" =>{
      "StartingSequenceNumber" => next_sequence_number
    },
    "ShardId" => next_shard_id,
    "ParentShardId" => shard_id
  }

  response = Excon::Response.new
  response.status = 200
  response.body = ""
  response
end