Class: Bio::MAF::KyotoIndex
- Inherits:
-
Object
- Object
- Bio::MAF::KyotoIndex
- Includes:
- KVHelpers
- Defined in:
- lib/bio/maf/index.rb
Constant Summary collapse
- COMPRESSION_KEY =
'bio-maf:compression'
- FILE_KEY =
'bio-maf:file'
- FORMAT_VERSION_KEY =
'bio-maf:index-format-version'
- FORMAT_VERSION =
2
- REF_SEQ_KEY =
'bio-maf:reference-sequence'
- MAX_SPECIES =
64
Constants included from KVHelpers
Bio::MAF::KVHelpers::CHROM_BIN_PREFIX_FMT, Bio::MAF::KVHelpers::KEY_FMT, Bio::MAF::KVHelpers::KEY_SCAN_FMT, Bio::MAF::KVHelpers::VAL_FMT, Bio::MAF::KVHelpers::VAL_IDX_OFFSET_FMT, Bio::MAF::KVHelpers::VAL_N_SEQ_FMT, Bio::MAF::KVHelpers::VAL_SPECIES_FMT, Bio::MAF::KVHelpers::VAL_TEXT_SIZE_FMT
Instance Attribute Summary collapse
-
#db ⇒ Object
readonly
Returns the value of attribute db.
-
#index_sequences ⇒ Object
Returns the value of attribute index_sequences.
-
#maf_file ⇒ Object
readonly
Returns the value of attribute maf_file.
-
#path ⇒ Object
readonly
Returns the value of attribute path.
-
#ref_only ⇒ Object
readonly
Returns the value of attribute ref_only.
-
#ref_seq ⇒ Object
Returns the value of attribute ref_seq.
-
#species ⇒ Object
readonly
Returns the value of attribute species.
-
#species_max_id ⇒ Object
readonly
Returns the value of attribute species_max_id.
Class Method Summary collapse
-
.build(parser, path, ref_only = true) ⇒ KyotoIndex
Build a new index from the MAF file being parsed by
parser
, and store it inpath
. -
.open(path) ⇒ KyotoIndex
Open an existing index for reading.
Instance Method Summary collapse
- #build(parser, ref_only = true) ⇒ Object
- #build_block_value(block) ⇒ Object
-
#close ⇒ Object
Close the underlying Kyoto Cabinet database handle.
- #dump(stream = $stdout) ⇒ Object
- #entries_for(block) ⇒ Object
-
#fetch_list(intervals, filter_spec = {}) ⇒ Object
Build a fetch list of alignment blocks to read, given an array of Bio::GenomicInterval objects.
-
#find(intervals, parser, filter = {}) {|block| ... } ⇒ Enumerable<Block>
Find all alignment blocks in the genomic regions in the list of Bio::GenomicInterval objects, and parse them with the given parser.
- #index_blocks(blocks) ⇒ Object
-
#initialize(path, db_arg = nil) ⇒ KyotoIndex
constructor
private
KyotoIndex Internals.
- #load_index_sequences ⇒ Object
- #load_species ⇒ Object
- #make_scan_worker(jobs, completed) ⇒ Object
- #overlaps?(gi, i_start, i_end) ⇒ Boolean
-
#reopen ⇒ Object
Reopen the same DB handle read-only.
- #scan_bin(cur, chrom_id, bin, bin_intervals, filters) ⇒ Object
-
#scan_bins(chrom_id, bin_intervals, filters) ⇒ Object
Scan the index for blocks matching the given bins and intervals.
- #scan_bins_parallel(chrom_id, bin_intervals, filters) ⇒ Object
- #seq_id_for(name) ⇒ Object
- #slice(interval, parser, filter = {}) ⇒ Object
- #species_id_for_seq(seq) ⇒ Object
- #with_profiling ⇒ Object
Methods included from KVHelpers
bin_start_prefix, extract_index_offset, extract_n_sequences, extract_species_vec, extract_text_size, unpack_key
Constructor Details
#initialize(path, db_arg = nil) ⇒ KyotoIndex
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
KyotoIndex Internals
436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 |
# File 'lib/bio/maf/index.rb', line 436 def initialize(path, db_arg=nil) @species = {} @species_max_id = -1 @max_sid = -1 if db_arg || ((path.size > 1) and File.exist?(path)) mode = KyotoCabinet::DB::OREADER else mode = KyotoCabinet::DB::OWRITER | KyotoCabinet::DB::OCREATE end @db = db_arg || KyotoCabinet::DB.new @path = path unless db_arg || db.open(path.to_s, mode) raise "Could not open DB file!" end if mode == KyotoCabinet::DB::OREADER @maf_file = db[FILE_KEY] self.ref_seq = db[REF_SEQ_KEY] load_index_sequences load_species end end |
Instance Attribute Details
#db ⇒ Object (readonly)
Returns the value of attribute db.
272 273 274 |
# File 'lib/bio/maf/index.rb', line 272 def db @db end |
#index_sequences ⇒ Object
Returns the value of attribute index_sequences.
274 275 276 |
# File 'lib/bio/maf/index.rb', line 274 def index_sequences @index_sequences end |
#maf_file ⇒ Object (readonly)
Returns the value of attribute maf_file.
273 274 275 |
# File 'lib/bio/maf/index.rb', line 273 def maf_file @maf_file end |
#path ⇒ Object (readonly)
Returns the value of attribute path.
272 273 274 |
# File 'lib/bio/maf/index.rb', line 272 def path @path end |
#ref_only ⇒ Object (readonly)
Returns the value of attribute ref_only.
272 273 274 |
# File 'lib/bio/maf/index.rb', line 272 def ref_only @ref_only end |
#ref_seq ⇒ Object
Returns the value of attribute ref_seq.
274 275 276 |
# File 'lib/bio/maf/index.rb', line 274 def ref_seq @ref_seq end |
#species ⇒ Object (readonly)
Returns the value of attribute species.
272 273 274 |
# File 'lib/bio/maf/index.rb', line 272 def species @species end |
#species_max_id ⇒ Object (readonly)
Returns the value of attribute species_max_id.
272 273 274 |
# File 'lib/bio/maf/index.rb', line 272 def species_max_id @species_max_id end |
Class Method Details
.build(parser, path, ref_only = true) ⇒ KyotoIndex
Build a new index from the MAF file being parsed by parser
,
and store it in path
.
366 367 368 369 370 |
# File 'lib/bio/maf/index.rb', line 366 def self.build(parser, path, ref_only=true) idx = self.new(path) idx.build(parser, ref_only) return idx end |
.open(path) ⇒ KyotoIndex
Open an existing index for reading.
357 358 359 |
# File 'lib/bio/maf/index.rb', line 357 def self.open(path) return KyotoIndex.new(path) end |
Instance Method Details
#build(parser, ref_only = true) ⇒ Object
679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 |
# File 'lib/bio/maf/index.rb', line 679 def build(parser, ref_only=true) db[FILE_KEY] = File.basename(parser.file_spec) @maf_file = db[FILE_KEY] if parser.compression db[COMPRESSION_KEY] = parser.compression.to_s end first_block = parser.parse_block self.ref_seq = first_block.sequences.first.source @ref_only = ref_only db[REF_SEQ_KEY] = ref_seq db[FORMAT_VERSION_KEY] = FORMAT_VERSION @index_sequences = {} index_blocks([first_block]) n = 0 parser.each_block.each_slice(1000).each do |blocks| index_blocks(blocks) n += blocks.size end LOG.debug { "Created index for #{n} blocks and #{@index_sequences.size} sequences." } db.synchronize(true) end |
#build_block_value(block) ⇒ Object
761 762 763 764 765 766 767 768 769 |
# File 'lib/bio/maf/index.rb', line 761 def build_block_value(block) bits = block.sequences.collect {|s| 1 << species_id_for_seq(s.source) } vec = bits.reduce(0, :|) return [block.offset, block.size, block.text_size, block.sequences.size, vec].pack(VAL_FMT) end |
#close ⇒ Object
Close the underlying Kyoto Cabinet database handle.
419 420 421 |
# File 'lib/bio/maf/index.rb', line 419 def close db.close end |
#dump(stream = $stdout) ⇒ Object
463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 |
# File 'lib/bio/maf/index.rb', line 463 def dump(stream=$stdout) bgzf = (db[COMPRESSION_KEY] == 'bgzf') stream.puts "KyotoIndex dump: #{@path}" stream.puts if db.count == 0 stream.puts "Empty database!" return end db.cursor_process do |cur| stream.puts "== Metadata ==" cur.jump('') while true k, v = cur.get(false) raise "unexpected end of records!" unless k break if k[0] == "\xff" stream.puts "#{k}: #{v}" unless cur.step raise "could not advance cursor!" end end stream.puts "== Index records ==" while pair = cur.get(true) _, chr, bin, s_start, s_end = pair[0].unpack(KEY_FMT) offset, len, text_size, n_seq, species_vec = pair[1].unpack(VAL_FMT) stream.puts "#{chr} [bin #{bin}] #{s_start}:#{s_end}" stream.puts " offset #{offset}, length #{len}" if bgzf block = Bio::BGZF.vo_block_offset(offset) data = Bio::BGZF.vo_data_offset(offset) stream.puts " BGZF block offset #{block}, data offset #{data}" end stream.puts " text size: #{text_size}" stream.puts " sequences in block: #{n_seq}" stream.printf(" species vector: %016x\n", species_vec) end end end |
#entries_for(block) ⇒ Object
771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 |
# File 'lib/bio/maf/index.rb', line 771 def entries_for(block) unless block.ref_seq.source == @ref_seq raise "Inconsistent reference sequence: expected #{@ref_seq}, got #{block.ref_seq.source}" end h = {} val = build_block_value(block) to_index = ref_only ? [block.sequences.first] : block.sequences to_index.each do |seq| seq_id = seq_id_for(seq.source) seq_end = seq.start + seq.size bin = Bio::Ucsc::UcscBin.bin_from_range(seq.start, seq_end) key = [255, seq_id, bin, seq.start, seq_end].pack(KEY_FMT) h[key] = val end return h end |
#fetch_list(intervals, filter_spec = {}) ⇒ Object
Build a fetch list of alignment blocks to read, given an array of Bio::GenomicInterval objects
523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 |
# File 'lib/bio/maf/index.rb', line 523 def fetch_list(intervals, filter_spec={}) filter_spec ||= {} filters = Filters.build(filter_spec, self) chrom = intervals.first.chrom chrom_id = index_sequences[chrom] unless chrom_id raise "chromosome #{chrom} not indexed!" end if intervals.find { |i| i.chrom != chrom } raise "all intervals must be for the same chromosome!" end # for each bin, build a list of the intervals to look for there bin_intervals = Hash.new { |h, k| h[k] = [] } intervals.each do |i| i.bin_all.each do |bin| bin_intervals[bin] << (i.zero_start...i.zero_end) end end bin_intervals.values.each do |intervals| intervals.sort_by! {|i| i.begin} end matches = if RUBY_PLATFORM == 'java' && bin_intervals.size > 4 scan_bins_parallel(chrom_id, bin_intervals, filters) else scan_bins(chrom_id, bin_intervals, filters) end matches.sort_by! { |e| e[0] } # sort by offset in file end |
#find(intervals, parser, filter = {}) {|block| ... } ⇒ Enumerable<Block>
Find all alignment blocks in the genomic regions in the list of Bio::GenomicInterval objects, and parse them with the given parser.
An optional Hash of filters may be passed in. The following keys are used:
:with_all_species => ["sp1", "sp2", ...]
Only match alignment blocks containing all given species.
:at_least_n_sequences => n
Only match alignment blocks with at least N sequences.
:min_size => n
Only match alignment blocks with text size at least N.
:max_size => n
Only match alignment blocks with text size at most N.
403 404 405 406 407 408 409 410 411 412 413 414 415 416 |
# File 'lib/bio/maf/index.rb', line 403 def find(intervals, parser, filter={}, &blk) start = Time.now fl = fetch_list(intervals, filter) LOG.debug { sprintf("Built fetch list of %d items in %.3fs.\n", fl.size, Time.now - start) } if ! fl.empty? parser.fetch_blocks(fl, &blk) else if ! block_given? [] end end end |
#index_blocks(blocks) ⇒ Object
701 702 703 704 |
# File 'lib/bio/maf/index.rb', line 701 def index_blocks(blocks) h = blocks.map { |b| entries_for(b) }.reduce(:merge!) db.set_bulk(h, false) end |
#load_index_sequences ⇒ Object
706 707 708 709 710 711 712 713 714 715 |
# File 'lib/bio/maf/index.rb', line 706 def load_index_sequences h = {} db.match_prefix("sequence:").each do |key| _, name = key.split(':', 2) id = db[key].to_i h[name] = id end @index_sequences = h @max_sid = @index_sequences.values.max end |
#load_species ⇒ Object
728 729 730 731 732 733 734 735 |
# File 'lib/bio/maf/index.rb', line 728 def load_species db.match_prefix("species:").each do |key| _, name = key.split(':', 2) id = db[key].to_i @species[name] = id end @species_max_id = @species.values.sort.last || -1 end |
#make_scan_worker(jobs, completed) ⇒ Object
611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 |
# File 'lib/bio/maf/index.rb', line 611 def make_scan_worker(jobs, completed) Thread.new do with_profiling do db.cursor_process do |cur| while true req = jobs.poll break unless req begin result = yield(cur, req) completed.put(result) rescue Exception => e completed.put(e) LOG.error "Worker failing: #{e.class}: #{e}" LOG.error e raise e end end end end end end |
#overlaps?(gi, i_start, i_end) ⇒ Boolean
672 673 674 675 676 677 |
# File 'lib/bio/maf/index.rb', line 672 def overlaps?(gi, i_start, i_end) g_start = gi.begin (i_start <= g_start && g_start < i_end) \ || gi.include?(i_start) end |
#reopen ⇒ Object
Reopen the same DB handle read-only. Only useful for unit tests.
459 460 461 |
# File 'lib/bio/maf/index.rb', line 459 def reopen KyotoIndex.new(@path, @db) end |
#scan_bin(cur, chrom_id, bin, bin_intervals, filters) ⇒ Object
633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 |
# File 'lib/bio/maf/index.rb', line 633 def scan_bin(cur, chrom_id, bin, bin_intervals, filters) # bin_intervals is sorted by zero_start # compute the start and end of all intervals of interest spanning_start = bin_intervals.first.begin spanning_end = bin_intervals.map {|i| i.end}.max # scan from the start of the bin cur.jump(bin_start_prefix(chrom_id, bin)) matches = [] while pair = cur.get(true) c_chr, c_bin, c_start, c_end = pair[0].unpack(KEY_SCAN_FMT) if (c_chr != chrom_id) \ || (c_bin != bin) \ || c_start >= spanning_end # we've hit the next bin, or chromosome, or gone past # the spanning interval, so we're done with this bin break end if c_end >= spanning_start # possible overlap # any intervals that end before the start of the current # block are no longer relevant while bin_intervals.first.end < c_start bin_intervals.shift end bin_intervals.each do |i| i_start = i.begin break if i_start > c_end if ((c_start <= i_start && i_start < c_end) \ || i.include?(c_start)) \ && filters.match(pair) # match matches << extract_index_offset(pair) break end end end end matches end |
#scan_bins(chrom_id, bin_intervals, filters) ⇒ Object
Scan the index for blocks matching the given bins and intervals.
553 554 555 556 557 558 559 560 561 562 |
# File 'lib/bio/maf/index.rb', line 553 def scan_bins(chrom_id, bin_intervals, filters) to_fetch = [] db.cursor_process do |cur| bin_intervals.each do |bin, bin_intervals_raw| matches = scan_bin(cur, chrom_id, bin, bin_intervals_raw, filters) to_fetch.concat(matches) end end to_fetch end |
#scan_bins_parallel(chrom_id, bin_intervals, filters) ⇒ Object
578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 |
# File 'lib/bio/maf/index.rb', line 578 def scan_bins_parallel(chrom_id, bin_intervals, filters) start = Time.now n_threads = ENV['profile'] ? 1 : java.lang.Runtime.runtime.availableProcessors jobs = java.util.concurrent.ConcurrentLinkedQueue.new(bin_intervals.to_a) completed = java.util.concurrent.LinkedBlockingQueue.new(128) threads = [] n_threads.times do threads << make_scan_worker(jobs, completed) do |cur, req| bin, intervals = req scan_bin(cur, chrom_id, bin, intervals, filters) end end n_completed = 0 to_fetch = [] while (n_completed < bin_intervals.size) c = completed.poll(5, java.util.concurrent.TimeUnit::SECONDS) if c.nil? if threads.find { |t| t.alive? } next else raise "No threads alive, completed #{n_completed}/#{bin_intervals.size} jobs!" end end raise "worker failed: #{c}" if c.is_a? Exception to_fetch.concat(c) n_completed += 1 end threads.each { |t| t.join } LOG.debug { sprintf("Matched %d index records with %d threads in %.3f seconds.\n", to_fetch.size, n_threads, Time.now - start) } to_fetch end |
#seq_id_for(name) ⇒ Object
717 718 719 720 721 722 723 724 725 726 |
# File 'lib/bio/maf/index.rb', line 717 def seq_id_for(name) sid = index_sequences[name] if ! sid @max_sid += 1 sid = @max_sid db.set("sequence:#{name}", sid.to_s) index_sequences[name] = sid end return sid end |
#slice(interval, parser, filter = {}) ⇒ Object
423 424 425 426 427 428 429 430 431 |
# File 'lib/bio/maf/index.rb', line 423 def slice(interval, parser, filter={}) if block_given? find([interval], parser, filter) do |block| yield block.slice(interval) end else enum_for(:slice, interval, parser, filter) end end |
#species_id_for_seq(seq) ⇒ Object
737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 |
# File 'lib/bio/maf/index.rb', line 737 def species_id_for_seq(seq) # NB can have multiple dots # example: otoGar1.scaffold_104707.1-93001 parts = seq.split('.', 2) if parts.size == 2 species_name = parts[0] if species.has_key? species_name return species[species_name] else species_id = @species_max_id + 1 if species_id >= MAX_SPECIES raise "cannot index MAF file with more than #{MAX_SPECIES} species" end species[species_name] = species_id db["species:#{species_name}"] = species_id @species_max_id = species_id return species_id end else # not in species.sequence format, apparently return nil end end |
#with_profiling ⇒ Object
564 565 566 567 568 569 570 571 572 573 574 575 576 |
# File 'lib/bio/maf/index.rb', line 564 def with_profiling if RUBY_PLATFORM == 'java' && ENV['profile'] rv = nil pdata = JRuby::Profiler.profile do rv = yield end printer = JRuby::Profiler::FlatProfilePrinter.new(pdata) printer.printProfile(STDERR) return rv else yield end end |