Class: Fog::AWS::Kinesis::Mock
- Inherits:
-
Object
- Object
- Fog::AWS::Kinesis::Mock
- Defined in:
- lib/fog/aws/kinesis.rb,
lib/fog/aws/requests/kinesis/put_record.rb,
lib/fog/aws/requests/kinesis/get_records.rb,
lib/fog/aws/requests/kinesis/put_records.rb,
lib/fog/aws/requests/kinesis/split_shard.rb,
lib/fog/aws/requests/kinesis/list_streams.rb,
lib/fog/aws/requests/kinesis/merge_shards.rb,
lib/fog/aws/requests/kinesis/create_stream.rb,
lib/fog/aws/requests/kinesis/delete_stream.rb,
lib/fog/aws/requests/kinesis/describe_stream.rb,
lib/fog/aws/requests/kinesis/add_tags_to_stream.rb,
lib/fog/aws/requests/kinesis/get_shard_iterator.rb,
lib/fog/aws/requests/kinesis/list_tags_for_stream.rb,
lib/fog/aws/requests/kinesis/remove_tags_from_stream.rb
Class Method Summary collapse
Instance Method Summary collapse
- #add_tags_to_stream(options = {}) ⇒ Object
- #create_stream(options = {}) ⇒ Object
- #data ⇒ Object
- #delete_stream(options = {}) ⇒ Object
- #describe_stream(options = {}) ⇒ Object
- #get_records(options = {}) ⇒ Object
- #get_shard_iterator(options = {}) ⇒ Object
-
#initialize(options = {}) ⇒ Mock
constructor
A new instance of Mock.
- #list_streams(options = {}) ⇒ Object
- #list_tags_for_stream(options = {}) ⇒ Object
- #merge_shards(options = {}) ⇒ Object
- #next_sequence_number ⇒ Object
- #next_shard_id ⇒ Object
- #put_record(options = {}) ⇒ Object
- #put_records(options = {}) ⇒ Object
- #remove_tags_from_stream(options = {}) ⇒ Object
- #reset_data ⇒ Object
- #split_shard(options = {}) ⇒ Object
Constructor Details
#initialize(options = {}) ⇒ Mock
Returns a new instance of Mock.
150 151 152 153 154 155 156 |
# File 'lib/fog/aws/kinesis.rb', line 150 def initialize(={}) @account_id = Fog::AWS::Mock.owner_id @aws_access_key_id = [:aws_access_key_id] @region = [:region] || 'us-east-1' Fog::AWS.validate_region!(@region) end |
Class Method Details
.data ⇒ Object
130 131 132 133 134 135 136 137 138 139 140 141 142 |
# File 'lib/fog/aws/kinesis.rb', line 130 def self.data @mutex.synchronize do @data ||= Hash.new do |hash, region| hash[region] = Hash.new do |region_hash, key| region_hash[key] = { :kinesis_streams => {} } end end yield @data if block_given? end end |
.next_sequence_number ⇒ Object
170 171 172 173 174 175 176 |
# File 'lib/fog/aws/kinesis.rb', line 170 def self.next_sequence_number @mutex.synchronize do @sequence_number ||= -1 @sequence_number += 1 @sequence_number.to_s end end |
.next_shard_id ⇒ Object
180 181 182 183 184 185 186 |
# File 'lib/fog/aws/kinesis.rb', line 180 def self.next_shard_id @mutex.synchronize do @shard_id ||= -1 @shard_id += 1 "shardId-#{@shard_id.to_s.rjust(12, "0")}" end end |
.reset ⇒ Object
144 145 146 147 148 |
# File 'lib/fog/aws/kinesis.rb', line 144 def self.reset @mutex.synchronize do @data = nil end end |
Instance Method Details
#add_tags_to_stream(options = {}) ⇒ Object
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/fog/aws/requests/kinesis/add_tags_to_stream.rb', line 30 def (={}) stream_name = .delete("StreamName") = .delete("Tags") unless stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name } raise Fog::AWS::Kinesis::ResourceNotFound.new("Stream #{stream_name} under account #{@account_id} not found.") end stream["Tags"] = stream["Tags"].merge() response = Excon::Response.new response.status = 200 response.body = "" response end |
#create_stream(options = {}) ⇒ Object
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/fog/aws/requests/kinesis/create_stream.rb', line 30 def create_stream(={}) stream_name = .delete("StreamName") shard_count = .delete("ShardCount") || 1 stream_arn = "arn:aws:kinesis:#{@region}:#{@account_id}:stream/#{stream_name}" if data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name } raise Fog::AWS::Kinesis::ResourceInUse.new("Stream #{stream_name} under account #{@account_id} already exists.") end shards = (0...shard_count).map do |shard| { "HashKeyRange"=>{ "EndingHashKey"=>"340282366920938463463374607431768211455", "StartingHashKey"=>"0" }, "SequenceNumberRange"=>{ "StartingSequenceNumber"=> next_sequence_number }, "ShardId"=>next_shard_id, "Records" => [] } end data[:kinesis_streams] = [{ "HasMoreShards" => false, "StreamARN" => stream_arn, "StreamName" => stream_name, "StreamStatus" => "ACTIVE", "Shards" => shards, "Tags" => {} }] response = Excon::Response.new response.status = 200 response.body = "" response end |
#data ⇒ Object
158 159 160 161 162 |
# File 'lib/fog/aws/kinesis.rb', line 158 def data self.class.data do |data| data[@region][@aws_access_key_id] end end |
#delete_stream(options = {}) ⇒ Object
28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/fog/aws/requests/kinesis/delete_stream.rb', line 28 def delete_stream(={}) stream_name = .delete("StreamName") unless stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name } raise Fog::AWS::Kinesis::ResourceNotFound.new("Stream #{stream_name} under account #{@account_id} not found.") end data[:kinesis_streams].delete(stream) response = Excon::Response.new response.status = 200 response.body = "" response end |
#describe_stream(options = {}) ⇒ Object
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/fog/aws/requests/kinesis/describe_stream.rb', line 36 def describe_stream(={}) stream_name = .delete("StreamName") unless stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name } raise Fog::AWS::Kinesis::ResourceNotFound.new("Stream #{stream_name} under account #{@account_id} not found.") end # Strip Records key out of shards for response shards = stream["Shards"].reject{ |k,_| k == "Records" } response = Excon::Response.new response.status = 200 response.body = { "StreamDescription" => stream.dup.merge("Shards" => shards) } response end |
#get_records(options = {}) ⇒ Object
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/fog/aws/requests/kinesis/get_records.rb', line 32 def get_records(={}) shard_iterator = Fog::JSON.decode(.delete("ShardIterator")) limit = .delete("Limit") || -1 stream_name = shard_iterator["StreamName"] shard_id = shard_iterator["ShardId"] starting_sequence_number = (shard_iterator["StartingSequenceNumber"] || 1).to_i unless stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name } raise Fog::AWS::Kinesis::ResourceNotFound.new("Stream #{stream_name} under account #{@account_id} not found.") end unless shard = stream["Shards"].detect{ |shard| shard["ShardId"] == shard_id } raise Fog::AWS::Kinesis::ResourceNotFound.new("Could not find shard #{shard_id} in stream #{stream_name} under account #{@account_id}.") end records = [] shard["Records"].each do |record| next if record["SequenceNumber"].to_i < starting_sequence_number records << record break if records.size == limit end shard_iterator["StartingSequenceNumber"] = if records.empty? starting_sequence_number.to_s else (records.last["SequenceNumber"].to_i + 1).to_s end response = Excon::Response.new response.status = 200 response.body = { "MillisBehindLatest"=> 0, "NextShardIterator"=> Fog::JSON.encode(shard_iterator), "Records"=> records } response end |
#get_shard_iterator(options = {}) ⇒ Object
36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/fog/aws/requests/kinesis/get_shard_iterator.rb', line 36 def get_shard_iterator(={}) stream_name = ["StreamName"] unless stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name } raise Fog::AWS::Kinesis::ResourceNotFound.new("Stream #{stream_name} under account #{@account_id} not found.") end response = Excon::Response.new response.status = 200 response.body = { "ShardIterator" => Fog::JSON.encode() # just encode the options that were given, we decode them in get_records } response end |
#list_streams(options = {}) ⇒ Object
28 29 30 31 32 33 34 35 36 |
# File 'lib/fog/aws/requests/kinesis/list_streams.rb', line 28 def list_streams(={}) response = Excon::Response.new response.status = 200 response.body = { "HasMoreStreams" => false, "StreamNames" => data[:kinesis_streams].map{ |stream| stream["StreamName"] } } response end |
#list_tags_for_stream(options = {}) ⇒ Object
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/fog/aws/requests/kinesis/list_tags_for_stream.rb', line 36 def (={}) stream_name = .delete("StreamName") unless stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name } raise Fog::AWS::Kinesis::ResourceNotFound.new("Stream #{stream_name} under account #{@account_id} not found.") end response = Excon::Response.new response.status = 200 response.body = { "HasMoreTags" => false, "Tags" => stream["Tags"].map{ |k,v| {"Key" => k, "Value" => v} } } response end |
#merge_shards(options = {}) ⇒ Object
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/fog/aws/requests/kinesis/merge_shards.rb', line 32 def merge_shards(={}) stream_name = .delete("StreamName") shard_to_merge_id = .delete("ShardToMerge") adjacent_shard_to_merge_id = .delete("AdjacentShardToMerge") unless stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name } raise Fog::AWS::Kinesis::ResourceNotFound.new("Stream #{stream_name} under account #{@account_id} not found.") end unless shard_to_merge = stream["Shards"].detect{ |shard| shard["ShardId"] == shard_to_merge_id } raise Fog::AWS::Kinesis::ResourceNotFound.new("Could not find shard #{shard_to_merge_id} in stream #{stream_name} under account #{@account_id}.") end unless adjacent_shard_to_merge = stream["Shards"].detect{ |shard| shard["ShardId"] == adjacent_shard_to_merge_id } raise Fog::AWS::Kinesis::ResourceNotFound.new("Could not find shard #{adjacent_shard_to_merge_id} in stream #{stream_name} under account #{@account_id}.") end # Close shards (set an EndingSequenceNumber on them) shard_to_merge["SequenceNumberRange"]["EndingSequenceNumber"] = next_sequence_number adjacent_shard_to_merge["SequenceNumberRange"]["EndingSequenceNumber"] = next_sequence_number new_starting_hash_key = [ shard_to_merge["HashKeyRange"]["StartingHashKey"].to_i, adjacent_shard_to_merge["HashKeyRange"]["StartingHashKey"].to_i ].min.to_s new_ending_hash_key = [ shard_to_merge["HashKeyRange"]["EndingHashKey"].to_i, adjacent_shard_to_merge["HashKeyRange"]["EndingHashKey"].to_i ].max.to_s # create a new shard with ParentShardId and AdjacentParentShardID stream["Shards"] << { "HashKeyRange"=> { "EndingHashKey" => new_ending_hash_key, "StartingHashKey" => new_starting_hash_key }, "SequenceNumberRange" => { "StartingSequenceNumber" => next_sequence_number }, "ShardId" => next_shard_id, "ParentShardId" => shard_to_merge_id, "AdjacentParentShardId" => adjacent_shard_to_merge_id } response = Excon::Response.new response.status = 200 response.body = "" response end |
#next_sequence_number ⇒ Object
178 |
# File 'lib/fog/aws/kinesis.rb', line 178 def next_sequence_number; self.class.next_sequence_number; end |
#next_shard_id ⇒ Object
188 |
# File 'lib/fog/aws/kinesis.rb', line 188 def next_shard_id; self.class.next_shard_id; end |
#put_record(options = {}) ⇒ Object
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/fog/aws/requests/kinesis/put_record.rb', line 38 def put_record(={}) stream_name = .delete("StreamName") unless stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name } raise Fog::AWS::Kinesis::ResourceNotFound.new("Stream #{stream_name} under account #{@account_id} not found.") end sequence_number = next_sequence_number data = .delete("Data") partition_key = .delete("PartitionKey") shard_id = stream["Shards"].sample["ShardId"] shard = stream["Shards"].detect{ |shard| shard["ShardId"] == shard_id } # store the records on the shard(s) shard["Records"] << { "SequenceNumber" => sequence_number, "Data" => data, "PartitionKey" => partition_key } response = Excon::Response.new response.status = 200 response.body = { "SequenceNumber" => sequence_number, "ShardId" => shard_id } response end |
#put_records(options = {}) ⇒ Object
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/fog/aws/requests/kinesis/put_records.rb', line 36 def put_records(={}) stream_name = .delete("StreamName") unless stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name } raise Fog::AWS::Kinesis::ResourceNotFound.new("Stream #{stream_name} under account #{@account_id} not found.") end records = .delete("Records") record_results = records.map { |r| sequence_number = next_sequence_number shard_id = stream["Shards"].sample["ShardId"] shard = stream["Shards"].detect{ |shard| shard["ShardId"] == shard_id } # store the records on the shard(s) shard["Records"] << r.merge("SequenceNumber" => sequence_number) { "SequenceNumber" => sequence_number, "ShardId" => shard_id } } response = Excon::Response.new response.status = 200 response.body = { "FailedRecordCount" => 0, "Records" => record_results } response end |
#remove_tags_from_stream(options = {}) ⇒ Object
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/fog/aws/requests/kinesis/remove_tags_from_stream.rb', line 30 def (={}) stream_name = .delete("StreamName") = .delete("TagKeys") unless stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name } raise Fog::AWS::Kinesis::ResourceNotFound.new("Stream #{stream_name} under account #{@account_id} not found.") end stream["Tags"] = stream["Tags"].delete_if { |k,_| .include?(k) } response = Excon::Response.new response.status = 200 response.body = "" response end |
#reset_data ⇒ Object
164 165 166 167 168 |
# File 'lib/fog/aws/kinesis.rb', line 164 def reset_data self.class.data do |data| data[@region].delete(@aws_access_key_id) end end |
#split_shard(options = {}) ⇒ Object
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/fog/aws/requests/kinesis/split_shard.rb', line 32 def split_shard(={}) stream_name = .delete("StreamName") shard_id = .delete("ShardToSplit") stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name } unless stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name } raise Fog::AWS::Kinesis::ResourceNotFound.new("Stream #{stream_name} under account #{@account_id} not found.") end unless shard = stream["Shards"].detect{ |shard| shard["ShardId"] == shard_id } raise Fog::AWS::Kinesis::ResourceNotFound.new("Could not find shard #{shard_id} in stream #{stream_name} under account #{@account_id}.") end # Close original shard (set an EndingSequenceNumber on it) shard["SequenceNumberRange"]["EndingSequenceNumber"] = next_sequence_number # Calculate new shard ranges parent_starting_hash_key = shard["HashKeyRange"]["StartingHashKey"] parent_ending_hash_key = shard["HashKeyRange"]["EndingHashKey"] new_starting_hash_key = .delete("NewStartingHashKey") # Create two new shards using contiguous hash space based on the original shard stream["Shards"] << { "HashKeyRange"=> { "EndingHashKey" => (new_starting_hash_key.to_i - 1).to_s, "StartingHashKey" => parent_starting_hash_key }, "SequenceNumberRange" => { "StartingSequenceNumber" => next_sequence_number }, "ShardId" => next_shard_id, "ParentShardId" => shard_id } stream["Shards"] << { "HashKeyRange" => { "EndingHashKey" => parent_ending_hash_key, "StartingHashKey" => new_starting_hash_key }, "SequenceNumberRange" =>{ "StartingSequenceNumber" => next_sequence_number }, "ShardId" => next_shard_id, "ParentShardId" => shard_id } response = Excon::Response.new response.status = 200 response.body = "" response end |