Class: Rhoconnect::RedisImpl
- Inherits:
-
Object
- Object
- Rhoconnect::RedisImpl
- Defined in:
- lib/rhoconnect/store.rb
Constant Summary collapse
- RESERVED_ATTRIB_NAMES =
["attrib_type", "id"]
Instance Method Summary collapse
-
#clone(srckey, dstkey) ⇒ Object
Create a copy of srckey in dstkey.
- #create(server = nil) ⇒ Object
-
#db ⇒ Object
This method should never be accessed by anything except specs.
- #decr(dockey) ⇒ Object
-
#delete_data(dockey, data = {}) ⇒ Object
Deletes data from a given doctype,source,user.
-
#delete_objects(dockey, data = []) ⇒ Object
Removes objects from a given doctype,source,user.
- #delete_value(dockey) ⇒ Object
- #doc_type(dockey) ⇒ Object
- #execute_transaction ⇒ Object
- #exists?(key) ⇒ Boolean
- #flush_all ⇒ Object
-
#flush_data(keymask) ⇒ Object
(also: #flash_data)
Deletes all keys matching a given mask.
-
#flush_zdata(dockey) ⇒ Object
Deletes all keys and their hashes from the Redis DB.
-
#get_data(dockey, type = Hash) ⇒ Object
Retrieves set for given dockey,source,user.
- #get_db_doc(dockey) ⇒ Object
-
#get_diff_data(src_dockey, dst_dockey, p_size = nil) ⇒ Object
Retrieves diff data hash between two sets each entry is in the form of DIFF_OBJ_ELEMENT => [OBJ_KEY, OBJ_DATA_PAIRS].
-
#get_diff_data_bruteforce(src_dockey, dst_dockey, p_size = nil) ⇒ Object
Retrieves diff data hash between two sets by using BruteForce approach => download both sets from Redis and compute diffs inside of Ruby worst-cast scenario - it is much slower than doing Redis sdiff but : it allows Redis clustering each entry is in the form of DIFF_OBJ_ELEMENT => [OBJ_KEY, OBJ_DATA_PAIRS].
- #get_inserts_deletes(inserts_elements_map, deletes_elements_map) ⇒ Object
- #get_list(dockey) ⇒ Object
- #get_lock(dockey, timeout = 0, raise_on_expire = false) ⇒ Object
- #get_object(dockey, key) ⇒ Object
- #get_objects(dockey, keys) ⇒ Object
-
#get_value(dockey) ⇒ Object
Retrieves value for a given key.
-
#get_zdata(dockey) ⇒ Object
Retrieves set for given dockey,associated key (client_id), obj_hashes.
- #incr(dockey) ⇒ Object
- #keys(pattern) ⇒ Object
-
#lock(dockey, timeout = 0, raise_on_expire = false, &block) ⇒ Object
Lock a given key and release when provided block is finished.
-
#put_data(dockey, data = {}, append = false, ttl = 0) ⇒ Object
(also: #set_data)
if it exists or appends data to the existing set if append flag set to true if ttl > 0 - sets expriration time on the keys.
- #put_list(dockey, data = [], append = false, ttl = 0) ⇒ Object
- #put_object(dockey, key, data = {}) ⇒ Object
-
#put_tmp_data(dockey, data = {}, append = false) ⇒ Object
Same as above, but sets TTL on every key.
-
#put_value(dockey, value) ⇒ Object
(also: #set_value)
Adds a simple key/value pair.
- #put_zdata(dockey, assoc_key, data = [], append = false) ⇒ Object
- #reconnect ⇒ Object
-
#release_lock(dockey, lock, raise_on_expire = false) ⇒ Object
Due to redis bug #140, setnx always returns true so this doesn’t work def get_lock(dockey,timeout=0) lock_key = _lock_key(dockey) until @db.setnx(lock_key,1) do sleep(1) end @db.expire(lock_key,timeout+1) Time.now.to_i+timeout+1 end.
-
#rename(srckey, dstkey, make_persist = false) ⇒ Object
Rename srckey to dstkey also, removes TTL if ordered (normally - it is not necessary).
-
#rename_tmp_data(srckey, dstkey) ⇒ Object
Rename temp doc srckey to persist dstkey.
- #set_db_doc(dockey, data, append = false) ⇒ Object
- #start_transaction ⇒ Object
- #update_count(dockey, count) ⇒ Object
- #update_elements(dockey, inserts_elements_map, deletes_elements_map) ⇒ Object
-
#update_objects(dockey, data = {}) ⇒ Object
updates objects for a given doctype, source, user create new objects if necessary.
-
#zadd(dockey, score, value) ⇒ Object
low-level operations with sorted sets.
- #zrange(dockey, start, stop) ⇒ Object
- #zrem(dockey, value) ⇒ Object
- #zremrangebyscore(dockey, min_elem, max_elem) ⇒ Object
- #zrevrange(dockey, start, stop) ⇒ Object
- #zscore(dockey, value) ⇒ Object
Instance Method Details
#clone(srckey, dstkey) ⇒ Object
Create a copy of srckey in dstkey
734 735 736 737 738 739 740 741 742 743 744 745 746 |
# File 'lib/rhoconnect/store.rb', line 734 def clone(srckey,dstkey) buckets = _get_bucket_indices(srckey) if buckets.size @db.pipelined do buckets.each do |bucket_index| _add_bucket_index(dstkey, bucket_index) @db.sdiffstore("#{dstkey}:#{bucket_index}", "#{srckey}:#{bucket_index}", '') end end else @db.sdiffstore(dstkey,srckey,'') end end |
#create(server = nil) ⇒ Object
270 271 272 273 274 |
# File 'lib/rhoconnect/store.rb', line 270 def create(server=nil) @db ||= _get_redis(server) raise "Error connecting to Redis store." unless @db and (@db.is_a?(Redis) or @db.is_a?(Redis::Client) or @db.is_a?(ConnectionPool::Wrapper)) end |
#db ⇒ Object
This method should never be accessed by anything except specs
870 871 872 |
# File 'lib/rhoconnect/store.rb', line 870 def db return @db end |
#decr(dockey) ⇒ Object
444 445 446 |
# File 'lib/rhoconnect/store.rb', line 444 def decr(dockey) @db.decr(dockey) end |
#delete_data(dockey, data = {}) ⇒ Object
Deletes data from a given doctype,source,user
413 414 415 416 417 418 |
# File 'lib/rhoconnect/store.rb', line 413 def delete_data(dockey,data={}) if dockey and data _delete_objects(dockey, data) end true end |
#delete_objects(dockey, data = []) ⇒ Object
Removes objects from a given doctype,source,user
405 406 407 408 409 410 |
# File 'lib/rhoconnect/store.rb', line 405 def delete_objects(dockey,data=[]) return 0 unless dockey and data objs = get_objects(dockey, data) _delete_objects(dockey, objs) end |
#delete_value(dockey) ⇒ Object
436 437 438 |
# File 'lib/rhoconnect/store.rb', line 436 def delete_value(dockey) @db.del(dockey) end |
#doc_type(dockey) ⇒ Object
292 293 294 |
# File 'lib/rhoconnect/store.rb', line 292 def doc_type(dockey) @db.type(dockey) if dockey end |
#execute_transaction ⇒ Object
288 289 290 |
# File 'lib/rhoconnect/store.rb', line 288 def execute_transaction @db.exec end |
#exists?(key) ⇒ Boolean
837 838 839 |
# File 'lib/rhoconnect/store.rb', line 837 def exists?(key) @db.exists(key) || @db.exists("#{key}:indices") end |
#flush_all ⇒ Object
280 281 282 |
# File 'lib/rhoconnect/store.rb', line 280 def flush_all @db.flushdb end |
#flush_data(keymask) ⇒ Object Also known as: flash_data
Deletes all keys matching a given mask
659 660 661 662 663 664 665 666 667 668 669 670 671 672 |
# File 'lib/rhoconnect/store.rb', line 659 def flush_data(keymask) if keymask.to_s[/[*\[\]?]/] # If the keymask contains any pattern matching characters # Use keys command to find all keys matching pattern (this is extremely expensive) # Then delete matches keys(keymask).each do |key| _delete_doc(key) end else # The keymask doesn't contain pattern matching characters # A delete call is all that is needed _delete_doc(keymask) end end |
#flush_zdata(dockey) ⇒ Object
Deletes all keys and their hashes from the Redis DB
829 830 831 832 833 834 835 |
# File 'lib/rhoconnect/store.rb', line 829 def flush_zdata(dockey) data = @db.zrange(dockey, 0, -1) data.each do |zsetkey| _delete_doc("#{dockey}:#{zsetkey}") end @db.zremrangebyrank(dockey, 0, -1) end |
#get_data(dockey, type = Hash) ⇒ Object
Retrieves set for given dockey,source,user
462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 |
# File 'lib/rhoconnect/store.rb', line 462 def get_data(dockey,type=Hash) res = type == Hash ? {} : [] if dockey if type == Hash buckets = _get_buckets(dockey) members = @db.pipelined do buckets.each do |bucket| @db.smembers(bucket) end if buckets end members.each do |elements| elements.each do |element| key,obj = get_obj_element(element) res[key] = obj #res[key].merge!({attrib => value}) end if elements end if members else res = get_list(dockey) end end res end |
#get_db_doc(dockey) ⇒ Object
304 305 306 307 308 309 310 311 312 313 314 315 |
# File 'lib/rhoconnect/store.rb', line 304 def get_db_doc(dockey) doctype = doc_type(dockey) if doctype == 'string' get_value(dockey) elsif doctype == 'list' get_data(dockey, Array).to_json elsif doctype == 'zset' get_zdata(dockey).to_json else get_data(dockey).to_json end end |
#get_diff_data(src_dockey, dst_dockey, p_size = nil) ⇒ Object
Retrieves diff data hash between two sets each entry is in the form of DIFF_OBJ_ELEMENT => [OBJ_KEY, OBJ_DATA_PAIRS]
496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 |
# File 'lib/rhoconnect/store.rb', line 496 def get_diff_data(src_dockey,dst_dockey,p_size=nil) res = {} return res if p_size == 0 # return immediately if p_size == 0 # NOTE: 0 and nil are different, nil means - return all diffs if src_dockey and dst_dockey # obtain combined indices indices = @db.hgetall("#{dst_dockey}:indices") indices.keys.each do |index| dst_bucket_name = "#{dst_dockey}:#{index}" src_bucket_name = "#{src_dockey}:#{index}" diff_elements = @db.sdiff(dst_bucket_name,src_bucket_name) diff_elements.each do |element| keypairs = get_obj_key_and_pairs(element) next unless keypairs res[element] = keypairs return res if p_size and (res.size >= p_size) end end end res end |
#get_diff_data_bruteforce(src_dockey, dst_dockey, p_size = nil) ⇒ Object
Retrieves diff data hash between two sets by using BruteForce approach
> download both sets from Redis and compute diffs inside of Ruby
worst-cast scenario - it is much slower than doing Redis sdiff but : it allows Redis clustering each entry is in the form of DIFF_OBJ_ELEMENT => [OBJ_KEY, OBJ_DATA_PAIRS]
524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 |
# File 'lib/rhoconnect/store.rb', line 524 def get_diff_data_bruteforce(src_dockey,dst_dockey,p_size=nil) inserts = {} deletes = {} # return immediately if p_size == 0 # NOTE: 0 and nil are different, nil means - return all diffs return res if p_size == 0 if src_dockey and dst_dockey # obtain combined indices indices = @db.hgetall("#{dst_dockey}:indices") indices.merge!(@db.hgetall("#{src_dockey}:indices")) indices.keys.each do |index| dst_bucket_name = "#{dst_dockey}:#{index}" src_bucket_name = "#{src_dockey}:#{index}" src_elements = Set.new(@db.smembers(src_bucket_name)) dst_elements = Set.new(@db.smembers(dst_bucket_name)) insert_diff_elements = dst_elements.dup.subtract(src_elements) unless p_size and (inserts.size >= p_size) delete_diff_elements = src_elements.dup.subtract(dst_elements) unless p_size and (deletes.size >= p_size) insert_diff_elements.each do |element| keypairs = get_obj_key_and_pairs(element) next unless keypairs inserts[element] = keypairs break if p_size and (inserts.size >= p_size) end if insert_diff_elements delete_diff_elements.each do |element| keypairs = get_obj_key_and_pairs(element) next unless keypairs deletes[element] = keypairs break if p_size and (deletes.size >= p_size) end if delete_diff_elements break if p_size and (inserts.size >= p_size) and (deletes.size >= p_size) end end [inserts, deletes] end |
#get_inserts_deletes(inserts_elements_map, deletes_elements_map) ⇒ Object
563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 |
# File 'lib/rhoconnect/store.rb', line 563 def get_inserts_deletes(inserts_elements_map, deletes_elements_map) inserts_obj_hash = {} inserts_elements_map.each do |element,keypairs| key,obj_pairs = keypairs[0],keypairs[1] next unless (key and obj_pairs) inserts_obj_hash[key] = Set.new(obj_pairs) end deletes_obj_hash = {} deletes_elements_map.each do |element,keypairs| key,obj_pairs = keypairs[0],keypairs[1] next unless (key and obj_pairs) deletes_obj_hash[key] = Set.new(obj_pairs) end # modified attributes inserts = {} deletes = {} inserts_obj_hash.each do |key, obj_set| deletes_pairs = nil inserts_pairs = nil if deletes_obj_hash.has_key?(key) deletes_pairs = deletes_obj_hash[key].dup.subtract(obj_set).to_a inserts_pairs = obj_set.dup.subtract(deletes_obj_hash[key]).to_a # remove the key from the deletes set - we already processed it deletes_obj_hash.delete(key) else # if object is not in the deletes set - then, it's all inserts inserts_pairs = obj_set.to_a end # split resulting pairs if inserts_pairs and inserts_pairs.size > 0 inserts[key] = split_obj_pairs(inserts_pairs) end if deletes_pairs and deletes_pairs.size > 0 deletes[key] = split_obj_pairs(deletes_pairs) end end # after we analyzed the inserts__obj_hash # => deletes_obj_hash should contain only the unmatched deletes deletes_obj_hash.each do |key, obj_set| if obj_set.size > 0 deletes[key] = split_obj_pairs(obj_set.to_a) end end [inserts, deletes] end |
#get_list(dockey) ⇒ Object
486 487 488 489 490 491 492 |
# File 'lib/rhoconnect/store.rb', line 486 def get_list(dockey) res = [] if dockey res = @db.lrange(dockey, 0, -1) end res end |
#get_lock(dockey, timeout = 0, raise_on_expire = false) ⇒ Object
683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 |
# File 'lib/rhoconnect/store.rb', line 683 def get_lock(dockey,timeout=0,raise_on_expire=false) lock_key = _lock_key(dockey) current_time = Time.now.to_i ts = current_time+(Rhoconnect.lock_duration || timeout)+1 loop do if not @db.setnx(lock_key,ts) current_lock = @db.get(lock_key) # ensure lock wasn't released between the setnx and get calls if current_lock current_lock_timeout = current_lock.to_i if raise_on_expire or Rhoconnect.raise_on_expired_lock if current_lock_timeout <= current_time # lock expired before operation which set it up completed # this process cannot continue without corrupting locked data raise StoreLockException, "Lock \"#{lock_key}\" expired before it was released" end else if current_lock_timeout <= current_time and @db.getset(lock_key,ts).to_i <= current_time # previous lock expired and we replaced it with our own break end end # lock was released between setnx and get - try to acquire it again elsif @db.setnx(lock_key,ts) break end sleep(1) current_time = Time.now.to_i else break #no lock was set, so we set ours and leaving end end return ts end |
#get_object(dockey, key) ⇒ Object
452 453 454 455 |
# File 'lib/rhoconnect/store.rb', line 452 def get_object(dockey, key) res = _get_objects(dockey, [key]) (res and res.size > 0) ? res.values[0] : nil end |
#get_objects(dockey, keys) ⇒ Object
457 458 459 |
# File 'lib/rhoconnect/store.rb', line 457 def get_objects(dockey, keys) _get_objects(dockey, keys) end |
#get_value(dockey) ⇒ Object
Retrieves value for a given key
432 433 434 |
# File 'lib/rhoconnect/store.rb', line 432 def get_value(dockey) @db.get(dockey) if dockey end |
#get_zdata(dockey) ⇒ Object
Retrieves set for given dockey,associated key (client_id), obj_hashes
802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 |
# File 'lib/rhoconnect/store.rb', line 802 def get_zdata(dockey) data = @db.zrange(dockey, 0, -1) ret = [] assoc_keys = [] scores = [] data.each do |zsetkey| obj_entries = [] obj_hash = get_data "#{dockey}:#{zsetkey}" obj_hash.each do |obj_key_with_index, objdata| index,objkey = obj_key_with_index.split('_', 2) obj_entries[index.to_i] = [objkey, objdata] end score,assoc_key,source_id_with_index = getelement(zsetkey) source_id, source_index = source_id_with_index.split(':', 2) if scores[-1] != score ret << [[source_id, obj_entries]] assoc_keys << assoc_key scores << score else ret[-1] << [source_id, obj_entries] end end if data [ret, assoc_keys] end |
#incr(dockey) ⇒ Object
440 441 442 |
# File 'lib/rhoconnect/store.rb', line 440 def incr(dockey) @db.incr(dockey) end |
#keys(pattern) ⇒ Object
654 655 656 |
# File 'lib/rhoconnect/store.rb', line 654 def keys(pattern) @db.keys(pattern) end |
#lock(dockey, timeout = 0, raise_on_expire = false, &block) ⇒ Object
Lock a given key and release when provided block is finished
676 677 678 679 680 681 |
# File 'lib/rhoconnect/store.rb', line 676 def lock(dockey,timeout=0,raise_on_expire=false, &block) m_lock = get_lock(dockey,timeout,raise_on_expire) res = yield release_lock(dockey,m_lock) res end |
#put_data(dockey, data = {}, append = false, ttl = 0) ⇒ Object Also known as: set_data
if it exists or appends data to the existing set if append flag set to true if ttl > 0 - sets expriration time on the keys
330 331 332 333 334 335 336 337 338 339 340 341 |
# File 'lib/rhoconnect/store.rb', line 330 def put_data(dockey,data={},append=false, ttl=0) if dockey and data flush_data(dockey) unless append # Inserts a hash or array if data.is_a?Hash _put_objects(dockey, data, ttl) else put_list(dockey,data,append, ttl) end end true end |
#put_list(dockey, data = [], append = false, ttl = 0) ⇒ Object
343 344 345 346 347 348 349 350 351 352 353 354 |
# File 'lib/rhoconnect/store.rb', line 343 def put_list(dockey, data=[], append=false, ttl=0) if dockey and data flush_data(dockey) unless append @db.pipelined do data.each do |element| @db.rpush(dockey, element) end @db.expire(dockey, ttl) if ttl > 0 end end true end |
#put_object(dockey, key, data = {}) ⇒ Object
317 318 319 |
# File 'lib/rhoconnect/store.rb', line 317 def put_object(dockey, key, data={}) _put_objects(dockey, {key => data}) end |
#put_tmp_data(dockey, data = {}, append = false) ⇒ Object
Same as above, but sets TTL on every key
322 323 324 |
# File 'lib/rhoconnect/store.rb', line 322 def put_tmp_data(dockey,data={},append=false) put_data(dockey, data, append, Rhoconnect.store_key_ttl) end |
#put_value(dockey, value) ⇒ Object Also known as: set_value
Adds a simple key/value pair
421 422 423 424 425 426 427 428 429 |
# File 'lib/rhoconnect/store.rb', line 421 def put_value(dockey,value) if dockey if value @db.set(dockey,value.to_s) else @db.del(dockey) end end end |
#put_zdata(dockey, assoc_key, data = [], append = false) ⇒ Object
779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 |
# File 'lib/rhoconnect/store.rb', line 779 def put_zdata(dockey,assoc_key,data=[],append=false) return true unless (dockey and assoc_key and data) flush_zdata(dockey) unless append current_score = 0 current_score_data = @db.zrevrange(dockey,0,0,:with_scores => true) current_score = current_score_data[-1][1].to_i if current_score_data and current_score_data[-1] current_score += 1 data.each_with_index do |source_queue_entry, source_entry_index| source_id = source_queue_entry[0] source_id_with_index = "#{source_id}:#{source_entry_index}" source_entry_data = source_queue_entry[1] source_entry_docname = setelement(current_score,assoc_key, source_id_with_index) source_entry_data.each_with_index do |obj_entry, obj_index| obj_key_with_index = "#{obj_index}_#{obj_entry[0]}" put_data("#{dockey}:#{source_entry_docname}",{obj_key_with_index => obj_entry[1]},true) end if source_entry_data @db.zadd(dockey, current_score, source_entry_docname) end if data true end |
#reconnect ⇒ Object
276 277 278 |
# File 'lib/rhoconnect/store.rb', line 276 def reconnect @db.client.reconnect end |
#release_lock(dockey, lock, raise_on_expire = false) ⇒ Object
Due to redis bug #140, setnx always returns true so this doesn’t work def get_lock(dockey,timeout=0)
lock_key = _lock_key(dockey)
until @db.setnx(lock_key,1) do
sleep(1)
end
@db.expire(lock_key,timeout+1)
Time.now.to_i+timeout+1
end
729 730 731 |
# File 'lib/rhoconnect/store.rb', line 729 def release_lock(dockey,lock,raise_on_expire=false) @db.del(_lock_key(dockey)) if raise_on_expire or Rhoconnect.raise_on_expired_lock or (lock >= Time.now.to_i) end |
#rename(srckey, dstkey, make_persist = false) ⇒ Object
Rename srckey to dstkey also, removes TTL if ordered (normally - it is not necessary)
755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 |
# File 'lib/rhoconnect/store.rb', line 755 def rename(srckey,dstkey,make_persist=false) buckets = _get_bucket_indices(srckey) if buckets.size @db.pipelined do @db.del("#{srckey}:indices") buckets.each do |bucket_index| _add_bucket_index(dstkey, bucket_index) @db.rename("#{srckey}:#{bucket_index}", "#{dstkey}:#{bucket_index}") end if make_persist @db.persist("#{dstkey}:indices") buckets.each do |bucket_index| @db.persist("#{dstkey}:#{bucket_index}") end end end else if @db.exists(srckey) @db.rename(srckey,dstkey) @db.persist(dstkey) if make_persist end end end |
#rename_tmp_data(srckey, dstkey) ⇒ Object
Rename temp doc srckey to persist dstkey
749 750 751 |
# File 'lib/rhoconnect/store.rb', line 749 def rename_tmp_data(srckey,dstkey) rename(srckey,dstkey,true) end |
#set_db_doc(dockey, data, append = false) ⇒ Object
296 297 298 299 300 301 302 |
# File 'lib/rhoconnect/store.rb', line 296 def set_db_doc(dockey, data, append=false) if data.is_a?(String) put_value(dockey, data) else put_data(dockey, data, append) end end |
#start_transaction ⇒ Object
284 285 286 |
# File 'lib/rhoconnect/store.rb', line 284 def start_transaction @db.multi end |
#update_count(dockey, count) ⇒ Object
448 449 450 |
# File 'lib/rhoconnect/store.rb', line 448 def update_count(dockey, count) @db.incrby(dockey, count) end |
#update_elements(dockey, inserts_elements_map, deletes_elements_map) ⇒ Object
612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 |
# File 'lib/rhoconnect/store.rb', line 612 def update_elements(dockey, inserts_elements_map, deletes_elements_map) indices_to_cleanup = Set.new @db.pipelined do collected_adds = {} collected_rems = {} inserts_elements_map.each do |element,keypairs| key = keypairs[0] next if not key or not element or element.size == 0 obj_bucket_index = _create_obj_index(key) bucket_name = "#{dockey}:#{obj_bucket_index}" _add_bucket_index(dockey, obj_bucket_index) collected_adds[bucket_name] ||= [] collected_adds[bucket_name] << element end deletes_elements_map.each do |element,keypairs| key = keypairs[0] next if not key or not element or element.size == 0 obj_bucket_index = _create_obj_index(key) bucket_name = "#{dockey}:#{obj_bucket_index}" indices_to_cleanup << bucket_name collected_rems[bucket_name] ||= [] collected_rems[bucket_name] << element end # now, perform SREM first, then SADD collected_rems.each do |bucket, bucket_data| @db.srem(bucket, bucket_data) end collected_adds.each do |bucket,bucket_data| @db.sadd(bucket, bucket_data) end end # now, cleanup buckets if necessary _cleanup_buckets(dockey, indices_to_cleanup.to_a) end |
#update_objects(dockey, data = {}) ⇒ Object
updates objects for a given doctype, source, user create new objects if necessary
358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 |
# File 'lib/rhoconnect/store.rb', line 358 def update_objects(dockey, data={}) return 0 unless dockey and data new_object_count = 0 objs = get_objects(dockey, data.keys) || {} collected_adds = {} collected_rems = {} # my_bucket = nil @db.pipelined do data.each do |key,obj| is_create = objs[key].nil? new_object_count += 1 if is_create obj_bucket = _add_bucket_index(dockey, "#{_create_obj_index(key)}") # collect SREM (if object exists in DB) unless is_create old_element = set_obj_element(key,objs[key]) collected_rems[obj_bucket] ||= [] collected_rems[obj_bucket] << old_element end # update the object and collect SADD objs[key] ||= {} objs[key].merge!(obj) new_element = set_obj_element(key,objs[key]) collected_adds[obj_bucket] ||= [] collected_adds[obj_bucket] << new_element end # process all SADD and SREM commands as one # SREM must go first collected_rems.each do |bucket, bucket_data| @db.srem(bucket, bucket_data) end collected_adds.each do |bucket, bucket_data| @db.sadd(bucket, bucket_data) end end #data1 = @db.smembers(my_bucket) #puts "data1 is #{data1.inspect}" new_object_count end |
#zadd(dockey, score, value) ⇒ Object
low-level operations with sorted sets
842 843 844 |
# File 'lib/rhoconnect/store.rb', line 842 def zadd(dockey, score, value) @db.zadd(dockey, score, value) end |
#zrange(dockey, start, stop) ⇒ Object
862 863 864 |
# File 'lib/rhoconnect/store.rb', line 862 def zrange(dockey, start, stop) @db.zrange(dockey, start, stop) end |
#zrem(dockey, value) ⇒ Object
846 847 848 |
# File 'lib/rhoconnect/store.rb', line 846 def zrem(dockey, value) @db.zrem(dockey, value) end |
#zremrangebyscore(dockey, min_elem, max_elem) ⇒ Object
850 851 852 |
# File 'lib/rhoconnect/store.rb', line 850 def zremrangebyscore(dockey, min_elem, max_elem) @db.zremrangebyscore(dockey, min_elem, max_elem) end |
#zrevrange(dockey, start, stop) ⇒ Object
858 859 860 |
# File 'lib/rhoconnect/store.rb', line 858 def zrevrange(dockey, start, stop) @db.zrevrange(dockey, start, stop) end |
#zscore(dockey, value) ⇒ Object
854 855 856 |
# File 'lib/rhoconnect/store.rb', line 854 def zscore(dockey, value) @db.zscore(dockey, value) end |