Class: Bio::MAF::KyotoIndex
- Inherits:
-
Object
- Object
- Bio::MAF::KyotoIndex
- Includes:
- KVHelpers
- Defined in:
- lib/bio/maf/index.rb
Constant Summary collapse
- COMPRESSION_KEY =
'bio-maf:compression'
- FILE_KEY =
'bio-maf:file'
- FORMAT_VERSION_KEY =
'bio-maf:index-format-version'
- FORMAT_VERSION =
2
- REF_SEQ_KEY =
'bio-maf:reference-sequence'
- MAX_SPECIES =
64
- CHUNK_THRESHOLD_BYTES =
50 * 1024 * 1024
- CHUNK_THRESHOLD_BLOCKS =
1000
Constants included from KVHelpers
Bio::MAF::KVHelpers::CHROM_BIN_PREFIX_FMT, Bio::MAF::KVHelpers::KEY_FMT, Bio::MAF::KVHelpers::KEY_SCAN_FMT, Bio::MAF::KVHelpers::VAL_FMT, Bio::MAF::KVHelpers::VAL_IDX_OFFSET_FMT, Bio::MAF::KVHelpers::VAL_N_SEQ_FMT, Bio::MAF::KVHelpers::VAL_SPECIES_FMT, Bio::MAF::KVHelpers::VAL_TEXT_SIZE_FMT
Instance Attribute Summary collapse
-
#db ⇒ Object
readonly
Returns the value of attribute db.
-
#index_sequences ⇒ Object
Returns the value of attribute index_sequences.
-
#maf_file ⇒ Object
readonly
Returns the value of attribute maf_file.
-
#path ⇒ Object
readonly
Returns the value of attribute path.
-
#ref_only ⇒ Object
readonly
Returns the value of attribute ref_only.
-
#ref_seq ⇒ Object
Returns the value of attribute ref_seq.
-
#species ⇒ Object
readonly
Returns the value of attribute species.
-
#species_max_id ⇒ Object
readonly
Returns the value of attribute species_max_id.
Class Method Summary collapse
-
.build(parser, path, ref_only = true) ⇒ KyotoIndex
Build a new index from the MAF file being parsed by
parser
, and store it inpath
. -
.open(path) ⇒ KyotoIndex
Open an existing index for reading.
Instance Method Summary collapse
- #build(parser, ref_only = true) ⇒ Object
- #build_block_value(block) ⇒ Object
-
#close ⇒ Object
Close the underlying Kyoto Cabinet database handle.
- #dump(stream = $stdout) ⇒ Object
- #entries_for(block) ⇒ Object
-
#fetch_list(intervals, filter_spec = {}) ⇒ Object
Build a fetch list of alignment blocks to read, given an array of Bio::GenomicInterval objects.
-
#find(intervals, parser, filter = {}) {|block| ... } ⇒ Enumerable<Block>
Find all alignment blocks in the genomic regions in the list of Bio::GenomicInterval objects, and parse them with the given parser.
- #index_blocks(blocks) ⇒ Object
-
#initialize(path, db_arg = nil) ⇒ KyotoIndex
constructor
private
KyotoIndex Internals.
- #load_index_sequences ⇒ Object
- #load_species ⇒ Object
- #make_scan_worker(jobs, completed) ⇒ Object
- #overlaps?(gi, i_start, i_end) ⇒ Boolean
- #prep(file_spec, compression, ref_only) ⇒ Object
-
#reopen ⇒ Object
Reopen the same DB handle read-only.
- #scan_bin(cur, chrom_id, bin, bin_intervals, filters) ⇒ Object
-
#scan_bins(chrom_id, bin_intervals, filters) ⇒ Object
Scan the index for blocks matching the given bins and intervals.
- #scan_bins_parallel(chrom_id, bin_intervals, filters) ⇒ Object
- #seq_id_for(name) ⇒ Object
- #slice(interval, parser, filter = {}) ⇒ Object
- #species_id_for_seq(seq) ⇒ Object
- #to_s ⇒ Object
- #with_profiling ⇒ Object
Methods included from KVHelpers
bin_start_prefix, extract_index_offset, extract_n_sequences, extract_species_vec, extract_text_size, unpack_key
Constructor Details
#initialize(path, db_arg = nil) ⇒ KyotoIndex
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
KyotoIndex Internals
469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 |
# File 'lib/bio/maf/index.rb', line 469 def initialize(path, db_arg=nil) @species = {} @species_max_id = -1 @index_sequences = {} @max_sid = -1 if db_arg || ((path.size > 1) and File.exist?(path)) mode = KyotoCabinet::DB::OREADER else mode = KyotoCabinet::DB::OWRITER | KyotoCabinet::DB::OCREATE end @db = db_arg || KyotoCabinet::DB.new @path = path path_str = "#{path.to_s}#opts=ls#dfunit=100000" unless db_arg || db.open(path_str, mode) raise "Could not open DB file!" end if mode == KyotoCabinet::DB::OREADER version = db[FORMAT_VERSION_KEY].to_i if version != FORMAT_VERSION raise "Index #{path} is version #{version}, expecting version #{FORMAT_VERSION}!" end @maf_file = db[FILE_KEY] self.ref_seq = db[REF_SEQ_KEY] load_index_sequences load_species end @mutex = Mutex.new end |
Instance Attribute Details
#db ⇒ Object (readonly)
Returns the value of attribute db.
304 305 306 |
# File 'lib/bio/maf/index.rb', line 304 def db @db end |
#index_sequences ⇒ Object
Returns the value of attribute index_sequences.
306 307 308 |
# File 'lib/bio/maf/index.rb', line 306 def index_sequences @index_sequences end |
#maf_file ⇒ Object (readonly)
Returns the value of attribute maf_file.
305 306 307 |
# File 'lib/bio/maf/index.rb', line 305 def maf_file @maf_file end |
#path ⇒ Object (readonly)
Returns the value of attribute path.
304 305 306 |
# File 'lib/bio/maf/index.rb', line 304 def path @path end |
#ref_only ⇒ Object (readonly)
Returns the value of attribute ref_only.
304 305 306 |
# File 'lib/bio/maf/index.rb', line 304 def ref_only @ref_only end |
#ref_seq ⇒ Object
Returns the value of attribute ref_seq.
306 307 308 |
# File 'lib/bio/maf/index.rb', line 306 def ref_seq @ref_seq end |
#species ⇒ Object (readonly)
Returns the value of attribute species.
304 305 306 |
# File 'lib/bio/maf/index.rb', line 304 def species @species end |
#species_max_id ⇒ Object (readonly)
Returns the value of attribute species_max_id.
304 305 306 |
# File 'lib/bio/maf/index.rb', line 304 def species_max_id @species_max_id end |
Class Method Details
.build(parser, path, ref_only = true) ⇒ KyotoIndex
Build a new index from the MAF file being parsed by parser
,
and store it in path
.
398 399 400 401 402 |
# File 'lib/bio/maf/index.rb', line 398 def self.build(parser, path, ref_only=true) idx = self.new(path) idx.build(parser, ref_only) return idx end |
.open(path) ⇒ KyotoIndex
Open an existing index for reading.
389 390 391 |
# File 'lib/bio/maf/index.rb', line 389 def self.open(path) return KyotoIndex.new(path) end |
Instance Method Details
#build(parser, ref_only = true) ⇒ Object
742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 |
# File 'lib/bio/maf/index.rb', line 742 def build(parser, ref_only=true) prep(parser.file_spec, parser.compression, ref_only) n = 0 acc = [] acc_bytes = 0 parser.each_block do |block| acc << block acc_bytes += block.size if acc_bytes > CHUNK_THRESHOLD_BYTES \ || acc.size > CHUNK_THRESHOLD_BLOCKS index_blocks(acc) acc = [] acc_bytes = 0 end n += 1 end index_blocks(acc) LOG.debug { "Created index for #{n} blocks and #{@index_sequences.size} sequences." } db.synchronize(true) end |
#build_block_value(block) ⇒ Object
840 841 842 843 844 845 846 847 848 |
# File 'lib/bio/maf/index.rb', line 840 def build_block_value(block) bits = block.sequences.collect {|s| 1 << species_id_for_seq(s.source) } vec = bits.reduce(0, :|) return [block.offset, block.size, block.text_size, block.sequences.size, vec].pack(VAL_FMT) end |
#close ⇒ Object
Close the underlying Kyoto Cabinet database handle.
451 452 453 |
# File 'lib/bio/maf/index.rb', line 451 def close db.close end |
#dump(stream = $stdout) ⇒ Object
507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 |
# File 'lib/bio/maf/index.rb', line 507 def dump(stream=$stdout) bgzf = (db[COMPRESSION_KEY] == 'bgzf') stream.puts "KyotoIndex dump: #{@path}" stream.puts if db.count == 0 stream.puts "Empty database!" return end db.cursor_process do |cur| stream.puts "== Metadata ==" cur.jump('') while true k, v = cur.get(false) raise "unexpected end of records!" unless k break if k[0] == "\xff" stream.puts "#{k}: #{v}" unless cur.step raise "could not advance cursor!" end end stream.puts "== Index records ==" while pair = cur.get(true) _, chr, bin, s_start, s_end = pair[0].unpack(KEY_FMT) offset, len, text_size, n_seq, species_vec = pair[1].unpack(VAL_FMT) stream.puts "#{chr} [bin #{bin}] #{s_start}:#{s_end}" stream.puts " offset #{offset}, length #{len}" if bgzf block = Bio::BGZF.vo_block_offset(offset) data = Bio::BGZF.vo_data_offset(offset) stream.puts " BGZF block offset #{block}, data offset #{data}" end stream.puts " text size: #{text_size}" stream.puts " sequences in block: #{n_seq}" stream.printf(" species vector: %016x\n", species_vec) end end end |
#entries_for(block) ⇒ Object
850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 |
# File 'lib/bio/maf/index.rb', line 850 def entries_for(block) begin unless block.ref_seq.source == @ref_seq raise "Inconsistent reference sequence: expected #{@ref_seq}, got #{block.ref_seq.source}" end h = {} val = build_block_value(block) to_index = ref_only ? [block.sequences.first] : block.sequences to_index.each do |seq| seq_id = seq_id_for(seq.source) # size 0 occurs in e.g. upstream1000.maf.gz next if seq.size == 0 seq_end = seq.start + seq.size bin = Bio::Ucsc::UcscBin.bin_from_range(seq.start, seq_end) key = [255, seq_id, bin, seq.start, seq_end].pack(KEY_FMT) h[key] = val end return h rescue Exception => e LOG.error "Failed to index block at offset #{block.offset}:\n#{block}" raise e end end |
#fetch_list(intervals, filter_spec = {}) ⇒ Object
Build a fetch list of alignment blocks to read, given an array of Bio::GenomicInterval objects
567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 |
# File 'lib/bio/maf/index.rb', line 567 def fetch_list(intervals, filter_spec={}) filter_spec ||= {} filters = Filters.build(filter_spec, self) chrom = intervals.first.chrom chrom_id = index_sequences[chrom] unless chrom_id raise "chromosome #{chrom} not indexed!" end if intervals.find { |i| i.chrom != chrom } raise "all intervals must be for the same chromosome!" end # for each bin, build a list of the intervals to look for there bin_intervals = Hash.new { |h, k| h[k] = [] } intervals.each do |i| i.bin_all.each do |bin| bin_intervals[bin] << (i.zero_start...i.zero_end) end end bin_intervals.values.each do |intervals| intervals.sort_by! {|i| i.begin} end matches = if RUBY_PLATFORM == 'java' && bin_intervals.size > 4 scan_bins_parallel(chrom_id, bin_intervals, filters) else scan_bins(chrom_id, bin_intervals, filters) end matches.sort_by! { |e| e[0] } # sort by offset in file end |
#find(intervals, parser, filter = {}) {|block| ... } ⇒ Enumerable<Block>
Find all alignment blocks in the genomic regions in the list of Bio::GenomicInterval objects, and parse them with the given parser.
An optional Hash of filters may be passed in. The following keys are used:
:with_all_species => ["sp1", "sp2", ...]
Only match alignment blocks containing all given species.
:at_least_n_sequences => n
Only match alignment blocks with at least N sequences.
:min_size => n
Only match alignment blocks with text size at least N.
:max_size => n
Only match alignment blocks with text size at most N.
435 436 437 438 439 440 441 442 443 444 445 446 447 448 |
# File 'lib/bio/maf/index.rb', line 435 def find(intervals, parser, filter={}, &blk) start = Time.now fl = fetch_list(intervals, filter) LOG.debug { sprintf("Built fetch list of %d items in %.3fs.", fl.size, Time.now - start) } if ! fl.empty? parser.fetch_blocks(fl, &blk) else if ! block_given? [] end end end |
#index_blocks(blocks) ⇒ Object
766 767 768 769 770 771 772 773 774 775 776 777 778 |
# File 'lib/bio/maf/index.rb', line 766 def index_blocks(blocks) h = @mutex.synchronize do if ! @seen_first # set the reference sequence from the first block first_block = blocks.first self.ref_seq = first_block.sequences.first.source db[REF_SEQ_KEY] = ref_seq @seen_first = true end blocks.map { |b| entries_for(b) }.reduce(:merge!) end db.set_bulk(h, false) end |
#load_index_sequences ⇒ Object
780 781 782 783 784 785 786 787 788 789 |
# File 'lib/bio/maf/index.rb', line 780 def load_index_sequences h = {} db.match_prefix("sequence:").each do |key| _, name = key.split(':', 2) id = db[key].to_i h[name] = id end @index_sequences = h @max_sid = @index_sequences.values.max end |
#load_species ⇒ Object
805 806 807 808 809 810 811 812 |
# File 'lib/bio/maf/index.rb', line 805 def load_species db.match_prefix("species:").each do |key| _, name = key.split(':', 2) id = db[key].to_i @species[name] = id end @species_max_id = @species.values.sort.last || -1 end |
#make_scan_worker(jobs, completed) ⇒ Object
660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 |
# File 'lib/bio/maf/index.rb', line 660 def make_scan_worker(jobs, completed) Thread.new do with_profiling do db.cursor_process do |cur| while true req = jobs.poll break unless req begin result = yield(cur, req) completed.put(result) rescue Exception => e completed.put(e) LOG.error "Worker failing: #{e.class}: #{e}" LOG.error e raise e end end end end end end |
#overlaps?(gi, i_start, i_end) ⇒ Boolean
721 722 723 724 725 726 |
# File 'lib/bio/maf/index.rb', line 721 def overlaps?(gi, i_start, i_end) g_start = gi.begin (i_start <= g_start && g_start < i_end) \ || gi.include?(i_start) end |
#prep(file_spec, compression, ref_only) ⇒ Object
731 732 733 734 735 736 737 738 739 740 |
# File 'lib/bio/maf/index.rb', line 731 def prep(file_spec, compression, ref_only) db[FORMAT_VERSION_KEY] = FORMAT_VERSION db[FILE_KEY] = File.basename(file_spec) @maf_file = db[FILE_KEY] if compression db[COMPRESSION_KEY] = compression.to_s end @ref_only = ref_only @seen_first = false end |
#reopen ⇒ Object
Reopen the same DB handle read-only. Only useful for unit tests.
503 504 505 |
# File 'lib/bio/maf/index.rb', line 503 def reopen KyotoIndex.new(@path, @db) end |
#scan_bin(cur, chrom_id, bin, bin_intervals, filters) ⇒ Object
682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 |
# File 'lib/bio/maf/index.rb', line 682 def scan_bin(cur, chrom_id, bin, bin_intervals, filters) # bin_intervals is sorted by zero_start # compute the start and end of all intervals of interest spanning_start = bin_intervals.first.begin spanning_end = bin_intervals.map {|i| i.end}.max # scan from the start of the bin cur.jump(bin_start_prefix(chrom_id, bin)) matches = [] while pair = cur.get(true) c_chr, c_bin, c_start, c_end = pair[0].unpack(KEY_SCAN_FMT) if (c_chr != chrom_id) \ || (c_bin != bin) \ || c_start >= spanning_end # we've hit the next bin, or chromosome, or gone past # the spanning interval, so we're done with this bin break end if c_end >= spanning_start # possible overlap # any intervals that end before the start of the current # block are no longer relevant while bin_intervals.first.end < c_start bin_intervals.shift end bin_intervals.each do |i| i_start = i.begin break if i_start > c_end if ((c_start <= i_start && i_start < c_end) \ || i.include?(c_start)) \ && filters.match(pair) # match matches << extract_index_offset(pair) break end end end end matches end |
#scan_bins(chrom_id, bin_intervals, filters) ⇒ Object
Scan the index for blocks matching the given bins and intervals.
597 598 599 600 601 602 603 604 605 606 |
# File 'lib/bio/maf/index.rb', line 597 def scan_bins(chrom_id, bin_intervals, filters) to_fetch = [] db.cursor_process do |cur| bin_intervals.each do |bin, bin_intervals_raw| matches = scan_bin(cur, chrom_id, bin, bin_intervals_raw, filters) to_fetch.concat(matches) end end to_fetch end |
#scan_bins_parallel(chrom_id, bin_intervals, filters) ⇒ Object
622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 |
# File 'lib/bio/maf/index.rb', line 622 def scan_bins_parallel(chrom_id, bin_intervals, filters) LOG.debug { sprintf("Beginning scan of %d bin intervals %s filters.", bin_intervals.size, filters.empty? ? "without" : "with") } start = Time.now n_threads = ENV['profile'] ? 1 : java.lang.Runtime.runtime.availableProcessors jobs = java.util.concurrent.ConcurrentLinkedQueue.new(bin_intervals.to_a) completed = java.util.concurrent.LinkedBlockingQueue.new(128) threads = [] n_threads.times do threads << make_scan_worker(jobs, completed) do |cur, req| bin, intervals = req scan_bin(cur, chrom_id, bin, intervals, filters) end end n_completed = 0 to_fetch = [] while (n_completed < bin_intervals.size) c = completed.poll(5, java.util.concurrent.TimeUnit::SECONDS) if c.nil? if threads.find { |t| t.alive? } next else raise "No threads alive, completed #{n_completed}/#{bin_intervals.size} jobs!" end end raise "worker failed: #{c}" if c.is_a? Exception to_fetch.concat(c) n_completed += 1 end threads.each { |t| t.join } LOG.debug { sprintf("Matched %d index records with %d threads in %.3f seconds.", to_fetch.size, n_threads, Time.now - start) } to_fetch end |
#seq_id_for(name) ⇒ Object
791 792 793 794 795 796 797 798 799 800 801 802 803 |
# File 'lib/bio/maf/index.rb', line 791 def seq_id_for(name) sid = index_sequences[name] if ! sid @max_sid += 1 sid = @max_sid # "" << foo is hideous but apparently what it takes to get a # non-shared copy of a string on JRuby... name_copy = "" << name db.set("sequence:#{name_copy}", sid.to_s) index_sequences[name_copy] = sid end return sid end |
#slice(interval, parser, filter = {}) ⇒ Object
455 456 457 458 459 460 461 462 463 464 |
# File 'lib/bio/maf/index.rb', line 455 def slice(interval, parser, filter={}) if block_given? find([interval], parser, filter) do |block| yield block.slice(interval) end else LOG.debug { "accumulating results of #slice" } enum_for(:slice, interval, parser, filter) end end |
#species_id_for_seq(seq) ⇒ Object
814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 |
# File 'lib/bio/maf/index.rb', line 814 def species_id_for_seq(seq) # NB can have multiple dots # example: otoGar1.scaffold_104707.1-93001 parts = seq.split('.', 2) if parts.size == 2 # "" << foo is hideous but apparently what it takes to get a # non-shared copy of a string on JRuby... species_name = "" << parts[0] else # not in species.sequence format, apparently species_name = "" << seq end if species.has_key? species_name return species[species_name] else species_id = @species_max_id + 1 if species_id >= MAX_SPECIES raise "cannot index MAF file with more than #{MAX_SPECIES} species" end species[species_name] = species_id db["species:#{species_name}"] = species_id @species_max_id = species_id return species_id end end |
#to_s ⇒ Object
498 499 500 |
# File 'lib/bio/maf/index.rb', line 498 def to_s "#<KyotoIndex path=#{path}>" end |
#with_profiling ⇒ Object
608 609 610 611 612 613 614 615 616 617 618 619 620 |
# File 'lib/bio/maf/index.rb', line 608 def with_profiling if RUBY_PLATFORM == 'java' && ENV['profile'] rv = nil pdata = JRuby::Profiler.profile do rv = yield end printer = JRuby::Profiler::FlatProfilePrinter.new(pdata) printer.printProfile(STDERR) return rv else yield end end |