Class: Bio::MAF::Parser
- Inherits:
-
Object
- Object
- Bio::MAF::Parser
- Includes:
- MAFParsing
- Defined in:
- lib/bio/maf/parser.rb
Overview
MAF parser, used for sequential and random-access parsing.
Options:
:parse_extended
: whether to parse 'i' and 'q' lines:parse_empty
: whether to parse 'e' lines:remove_gaps
: remove gaps left after filtering sequences:join_blocks
: join blocks where possible:upcase
: fold sequence data to upper case:chunk_size
: read MAF file in chunks of this many bytes:random_chunk_size
: as above, but for random access (#fetch_blocks):merge_max
: merge up to this many bytes of blocks for random access:threads
: number of threads to use for parallel parsing. Only useful under JRuby.:strict
: abort on un-parseable lines instead of continuing with a warning.
Constant Summary collapse
- SEQ_CHUNK_SIZE =
131072
- RANDOM_CHUNK_SIZE =
4096
- MERGE_MAX =
SEQ_CHUNK_SIZE
- DEFAULT_OPTS =
{ :chunk_size => SEQ_CHUNK_SIZE, :random_chunk_size => RANDOM_CHUNK_SIZE, :merge_max => MERGE_MAX, :parse_extended => false, :parse_empty => false, :readahead_thread => true, :seq_parse_thread => true }
- WRAP_OPTS =
[:as_bio_alignment, :join_blocks, :remove_gaps, :upcase]
Constants included from MAFParsing
MAFParsing::BLOCK_START, MAFParsing::BLOCK_START_OR_EOS, MAFParsing::COMMENT, MAFParsing::E, MAFParsing::EOL_OR_EOF, MAFParsing::I, MAFParsing::JRUBY_P, MAFParsing::Q, MAFParsing::S, MAFParsing::STRAND_SYM
Instance Attribute Summary collapse
-
#at_end ⇒ Boolean
readonly
Whether EOF has been reached.
-
#base_reader ⇒ Class
readonly
ChunkReader class to use for random access.
-
#chunk_start ⇒ Integer
readonly
Starting offset of the current chunk.
-
#compression ⇒ Symbol
readonly
Compression method used for this file, or nil.
-
#cr ⇒ ChunkReader
readonly
ChunkReader.
-
#f ⇒ IO
readonly
File handle for MAF file.
-
#file_spec ⇒ String
readonly
Path of MAF file being parsed.
-
#header ⇒ Header
readonly
Header of the MAF file being parsed.
-
#last_block_pos ⇒ Integer
readonly
Offset of the last block start in this chunk.
-
#opts ⇒ Hash
readonly
Parser options.
- #parse_empty ⇒ Object
- #parse_extended ⇒ Object private
-
#phys_f ⇒ IO
readonly
private
May be gzip-compressed.
-
#s ⇒ StringScanner
readonly
Scanner for parsing.
Instance Method Summary collapse
-
#_merge_bgzf_fetch_list(orig_fl) ⇒ Object
Build a merged fetch list in a BGZF-aware way.
- #_merge_fetch_list(orig_fl) ⇒ Object
-
#_parse_header ⇒ Object
Parse the header of the MAF file.
-
#_wrap(options, fun, &blk) ⇒ Object
options should be [:outer, ..., :inner].
- #block_joiner(options, fun) {|prev| ... } ⇒ Object
- #close ⇒ Object
-
#context(chunk_size) ⇒ ParseContext
private
Create a ParseContext for random access, using the given chunk size.
- #conv_map(options, search, fun) ⇒ Object
- #conv_send(options, search, sym, always_yield_block = false) ⇒ Object
-
#each_block {|block| ... } ⇒ Enumerator<Block>
(also: #parse_blocks)
Parse all alignment blocks until EOF.
- #each_block_seq ⇒ Object
-
#fetch_blocks(fetch_list) {|block| ... } ⇒ Enumerable<Block>
Fetch and parse blocks given by
fetch_list
. -
#fetch_blocks_merged(fetch_list, &blk) ⇒ Array<Block>
Fetch and parse the blocks given by the merged fetch list.
-
#fetch_blocks_merged_parallel(fetch_list) ⇒ Array<Block>
Fetch and parse the blocks given by the merged fetch list, in parallel.
- #filter_seq_count(fun) ⇒ Object
-
#initialize(file_spec, parse_opts = {}) ⇒ Parser
constructor
Create a new parser instance.
-
#make_worker(jobs, ct) ⇒ Object
Create a worker thread for parallel parsing.
-
#merge_fetch_list(orig_fl) ⇒ Object
Merge contiguous blocks in the given fetch list, up to
:merge_max
bytes. - #parse_block ⇒ Object
-
#parse_blocks_parallel ⇒ Object
private
Parse alignment blocks with a worker thread.
-
#sequence_filter ⇒ Hash
Sequence filter to apply.
-
#sequence_filter=(filter) ⇒ Object
Set the sequence filter.
-
#with_context(chunk_size) ⇒ Object
private
Execute the given block with a ParseContext using the given
chunk_size
as an argument. - #wrap_block_seq(fun, &blk) ⇒ Object
Methods included from MAFParsing
#_parse_block, #gather_leading_fragment, #parse_block_data, #parse_empty_line, #parse_error, #parse_maf_vars, #parse_seq_line, #parse_trailing_fragment, #seq_filter_ok?, #set_last_block_pos!, #trailing_nl?
Constructor Details
#initialize(file_spec, parse_opts = {}) ⇒ Parser
Create a new parser instance.
587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 |
# File 'lib/bio/maf/parser.rb', line 587 def initialize(file_spec, parse_opts={}) opts = DEFAULT_OPTS.merge(parse_opts) @opts = opts @random_access_chunk_size = opts[:random_chunk_size] @merge_max = opts[:merge_max] @parse_extended = opts[:parse_extended] @parse_empty = opts[:parse_empty] @chunk_start = 0 if file_spec.respond_to? :flush # an IO object # guess what, Pathnames respond to :read... @f = file_spec @file_spec = @f.path if @f.respond_to?(:path) # TODO: test for gzip? else # a pathname (or Pathname) @file_spec = file_spec @phys_f = File.open(file_spec) if file_spec.to_s.end_with?(".maf.gz") @f = Zlib::GzipReader.new(@phys_f) @compression = :gzip else @f = @phys_f end end if @file_spec.to_s =~ /\.bgzf?$/ @base_reader = BGZFChunkReader @compression = :bgzf else @base_reader = ChunkReader end @cr = base_reader.new(@f, opts[:chunk_size]) if JRUBY_P && opts[:readahead_thread] LOG.debug "Using ThreadedChunkReaderWrapper." @cr = ThreadedChunkReaderWrapper.new(@cr) end @s = StringScanner.new(cr.read_chunk()) set_last_block_pos! @at_end = false _parse_header() end |
Instance Attribute Details
#at_end ⇒ Boolean (readonly)
Returns whether EOF has been reached.
551 552 553 |
# File 'lib/bio/maf/parser.rb', line 551 def at_end @at_end end |
#base_reader ⇒ Class (readonly)
Returns ChunkReader class to use for random access.
549 550 551 |
# File 'lib/bio/maf/parser.rb', line 549 def base_reader @base_reader end |
#chunk_start ⇒ Integer (readonly)
Returns starting offset of the current chunk.
555 556 557 |
# File 'lib/bio/maf/parser.rb', line 555 def chunk_start @chunk_start end |
#compression ⇒ Symbol (readonly)
Returns compression method used for this file, or nil.
559 560 561 |
# File 'lib/bio/maf/parser.rb', line 559 def compression @compression end |
#cr ⇒ ChunkReader (readonly)
Returns ChunkReader.
546 547 548 |
# File 'lib/bio/maf/parser.rb', line 546 def cr @cr end |
#f ⇒ IO (readonly)
Returns file handle for MAF file.
538 539 540 |
# File 'lib/bio/maf/parser.rb', line 538 def f @f end |
#file_spec ⇒ String (readonly)
Returns path of MAF file being parsed.
536 537 538 |
# File 'lib/bio/maf/parser.rb', line 536 def file_spec @file_spec end |
#header ⇒ Header (readonly)
Returns header of the MAF file being parsed.
534 535 536 |
# File 'lib/bio/maf/parser.rb', line 534 def header @header end |
#last_block_pos ⇒ Integer (readonly)
Returns offset of the last block start in this chunk.
557 558 559 |
# File 'lib/bio/maf/parser.rb', line 557 def last_block_pos @last_block_pos end |
#opts ⇒ Hash (readonly)
Returns parser options.
553 554 555 |
# File 'lib/bio/maf/parser.rb', line 553 def opts @opts end |
#parse_empty ⇒ Object
563 564 565 |
# File 'lib/bio/maf/parser.rb', line 563 def parse_empty @parse_empty end |
#parse_extended ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
562 563 564 |
# File 'lib/bio/maf/parser.rb', line 562 def parse_extended @parse_extended end |
#phys_f ⇒ IO (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
May be gzip-compressed.
542 543 544 |
# File 'lib/bio/maf/parser.rb', line 542 def phys_f @phys_f end |
#s ⇒ StringScanner (readonly)
Returns scanner for parsing.
544 545 546 |
# File 'lib/bio/maf/parser.rb', line 544 def s @s end |
Instance Method Details
#_merge_bgzf_fetch_list(orig_fl) ⇒ Object
Build a merged fetch list in a BGZF-aware way. This will group together all MAF blocks from a single BGZF block. These MAF blocks may not be consecutive.
820 821 822 823 824 825 826 827 828 829 830 831 |
# File 'lib/bio/maf/parser.rb', line 820 def _merge_bgzf_fetch_list(orig_fl) block_e = orig_fl.chunk { |entry| Bio::BGZF::vo_block_offset(entry[0]) } block_e.collect do |bgzf_block, fl| # text size to read from disk, from the start of the first # block to the end of the last block text_size = fl.last[0] + fl.last[1] - fl.first[0] offsets = fl.collect { |e| e[0] } [fl.first[0], text_size, offsets] end end |
#_merge_fetch_list(orig_fl) ⇒ Object
797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 |
# File 'lib/bio/maf/parser.rb', line 797 def _merge_fetch_list(orig_fl) fl = orig_fl.dup r = [] until fl.empty? do cur = fl.shift if r.last \ && (r.last[0] + r.last[1]) == cur[0] \ && (r.last[1] + cur[1]) <= @merge_max # contiguous with the previous one # add to length and increment count r.last[1] += cur[1] r.last[2] << cur[0] else cur << [cur[0]] r << cur end end return r end |
#_parse_header ⇒ Object
Parse the header of the MAF file.
834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 |
# File 'lib/bio/maf/parser.rb', line 834 def _parse_header parse_error("not a MAF file") unless s.scan(/##maf\s*/) vars = parse_maf_vars() align_params = nil while s.scan(/^#\s*(.+?)\n/) if align_params == nil align_params = s[1] else align_params << ' ' << s[1] end end @header = Header.new(vars, align_params) if ! s.skip_until(BLOCK_START) @at_end = true end end |
#_wrap(options, fun, &blk) ⇒ Object
options should be [:outer, ..., :inner]
898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 |
# File 'lib/bio/maf/parser.rb', line 898 def _wrap(, fun, &blk) first = .shift case first when nil fun.call(&blk) when :sequence_filter conv_map(, fun, lambda { |b| b if b.sequences.size > 1 }, &blk) when :join_blocks block_joiner(, fun, &blk) when :as_bio_alignment conv_send(, fun, :to_bio_alignment, &blk) when :upcase conv_send(, fun, :upcase!, true, &blk) when :remove_gaps conv_map(, fun, lambda { |b| b.remove_gaps! if b.filtered?; b }, &blk) else raise "unhandled wrapper mode: #{first}" end end |
#block_joiner(options, fun) {|prev| ... } ⇒ Object
937 938 939 940 941 942 943 944 945 946 947 948 949 |
# File 'lib/bio/maf/parser.rb', line 937 def block_joiner(, fun) prev = nil _wrap(, fun) do |cur| if prev && (prev.filtered? || cur.filtered?) \ && prev.joinable_with?(cur) prev = prev.join(cur) else yield prev if prev prev = cur end end yield prev if prev end |
#close ⇒ Object
629 630 631 |
# File 'lib/bio/maf/parser.rb', line 629 def close f.close end |
#context(chunk_size) ⇒ ParseContext
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Create a Bio::MAF::ParseContext for random access, using the given chunk size.
638 639 640 641 642 643 644 645 646 |
# File 'lib/bio/maf/parser.rb', line 638 def context(chunk_size) # IO#dup calls dup(2) internally, but seems broken on JRuby... if file_spec fd = File.open(file_spec) else fd = f.dup end ParseContext.new(fd, chunk_size, self) end |
#conv_map(options, search, fun) ⇒ Object
951 952 953 954 955 956 |
# File 'lib/bio/maf/parser.rb', line 951 def conv_map(, search, fun) _wrap(, search) do |block| v = fun.call(block) yield v if v end end |
#conv_send(options, search, sym, always_yield_block = false) ⇒ Object
958 959 960 961 962 963 964 965 966 967 |
# File 'lib/bio/maf/parser.rb', line 958 def conv_send(, search, sym, always_yield_block=false) _wrap(, search) do |block| v = block.send(sym) if always_yield_block yield block else yield v if v end end end |
#each_block {|block| ... } ⇒ Enumerator<Block> Also known as: parse_blocks
Parse all alignment blocks until EOF.
Delegates to #parse_blocks_parallel if :threads
is set
under JRuby.
859 860 861 862 863 864 865 866 867 868 869 870 |
# File 'lib/bio/maf/parser.rb', line 859 def each_block(&blk) if block_given? if JRUBY_P && opts[:seq_parse_thread] fun = method(:parse_blocks_parallel) else fun = method(:each_block_seq) end wrap_block_seq(fun, &blk) else enum_for(:each_block) end end |
#each_block_seq ⇒ Object
873 874 875 876 877 878 |
# File 'lib/bio/maf/parser.rb', line 873 def each_block_seq until at_end block = _parse_block() yield block if block end end |
#fetch_blocks(fetch_list) {|block| ... } ⇒ Enumerable<Block>
Fetch and parse blocks given by fetch_list
.
fetch_list
should be an array of [offset, length]
tuples.
683 684 685 686 687 688 689 690 691 692 693 694 695 |
# File 'lib/bio/maf/parser.rb', line 683 def fetch_blocks(fetch_list, &blk) if blk merged = merge_fetch_list(fetch_list) if JRUBY_P && @opts.fetch(:threads, 1) > 1 fun = lambda { |&b2| fetch_blocks_merged_parallel(merged, &b2) } else fun = lambda { |&b2| fetch_blocks_merged(merged, &b2) } end wrap_block_seq(fun, &blk) else enum_for(:fetch_blocks, fetch_list) end end |
#fetch_blocks_merged(fetch_list, &blk) ⇒ Array<Block>
Fetch and parse the blocks given by the merged fetch list.
701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 |
# File 'lib/bio/maf/parser.rb', line 701 def fetch_blocks_merged(fetch_list, &blk) start = Time.now total_size = fetch_list.collect { |e| e[1] }.reduce(:+) count = 0 with_context(@random_access_chunk_size) do |ctx| fetch_list.each do |e| ctx.fetch_blocks(*e, &blk) count += 1 end end elapsed = Time.now - start rate = (total_size / 1048576.0) / elapsed LOG.debug { sprintf("Fetched %d blocks in %.3fs, %.1f MB/s.", count, elapsed, rate) } end |
#fetch_blocks_merged_parallel(fetch_list) ⇒ Array<Block>
Fetch and parse the blocks given by the merged fetch list, in
parallel. Uses the number of threads specified by the
:threads
parser option.
723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 |
# File 'lib/bio/maf/parser.rb', line 723 def fetch_blocks_merged_parallel(fetch_list) total_size = fetch_list.collect { |e| e[1] }.reduce(:+) start = Time.now n_threads = @opts.fetch(:threads, 1) # TODO: break entries up into longer runs for more # sequential I/O jobs = java.util.concurrent.ConcurrentLinkedQueue.new(fetch_list) ct = CompletionTracker.new(fetch_list) completed = ct.queue threads = [] n_threads.times { threads << make_worker(jobs, ct) } n_res = 0 while n_res < fetch_list.size c = completed.poll(1, java.util.concurrent.TimeUnit::SECONDS) unless c raise "Worker failed!" if threads.find { |t| t.status.nil? } next end c.each do |block| yield block end n_res += 1 end threads.each { |t| t.join } elapsed = Time.now - start LOG.debug { sprintf("Fetched blocks from %d threads in %.1fs.", n_threads, elapsed) } mb = total_size / 1048576.0 LOG.debug { sprintf("%.3f MB processed (%.1f MB/s).", mb, mb / elapsed) } end |
#filter_seq_count(fun) ⇒ Object
931 932 933 934 935 |
# File 'lib/bio/maf/parser.rb', line 931 def filter_seq_count(fun) fun.call() do |block| yield block if block.filtered? && block.sequences.size > 1 end end |
#make_worker(jobs, ct) ⇒ Object
Create a worker thread for parallel parsing.
761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 |
# File 'lib/bio/maf/parser.rb', line 761 def make_worker(jobs, ct) Thread.new do begin with_context(@random_access_chunk_size) do |ctx| while true req = jobs.poll break unless req n_blocks = req[2].size blocks = ctx.fetch_blocks(*req).to_a if blocks.size != n_blocks raise "expected #{n_blocks}, got #{blocks.size}: #{e.inspect}" end ct << blocks end end rescue Exception => e LOG.error "Worker failing: #{e.class}: #{e}" LOG.error e raise e end end end |
#merge_fetch_list(orig_fl) ⇒ Object
Merge contiguous blocks in the given fetch list, up to
:merge_max
bytes.
Returns [offset, size, [offset1, offset2, ...]]
tuples.
788 789 790 791 792 793 794 795 |
# File 'lib/bio/maf/parser.rb', line 788 def merge_fetch_list(orig_fl) case compression when nil _merge_fetch_list(orig_fl) when :bgzf _merge_bgzf_fetch_list(orig_fl) end end |
#parse_block ⇒ Object
880 881 882 883 884 885 886 |
# File 'lib/bio/maf/parser.rb', line 880 def parse_block b = nil wrap_block_seq(lambda { |&blk| blk.call(_parse_block()) }) do |block| b = block end b end |
#parse_blocks_parallel ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Parse alignment blocks with a worker thread.
973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 |
# File 'lib/bio/maf/parser.rb', line 973 def parse_blocks_parallel queue = java.util.concurrent.LinkedBlockingQueue.new(128) worker = Thread.new do begin LOG.debug "Starting parse worker." until at_end block = _parse_block() queue.put(block) if block end queue.put(:eof) LOG.debug { "Parse worker reached EOF." } rescue Exception LOG.error $! Thread.current[:exception] = $! raise end end saw_eof = false n_final_poll = 0 while true block = queue.poll(1, java.util.concurrent.TimeUnit::SECONDS) if block == :eof saw_eof = true break elsif block yield block else # timed out unless worker.alive? LOG.debug "Worker has exited." n_final_poll += 1 end end break if n_final_poll > 1 end unless saw_eof raise "worker exited unexpectedly from #{worker[:exception]}!" end end |
#sequence_filter ⇒ Hash
Sequence filter to apply.
665 666 667 |
# File 'lib/bio/maf/parser.rb', line 665 def sequence_filter @sequence_filter ||= {} end |
#sequence_filter=(filter) ⇒ Object
Set the sequence filter.
672 673 674 |
# File 'lib/bio/maf/parser.rb', line 672 def sequence_filter=(filter) @sequence_filter = filter end |
#with_context(chunk_size) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Execute the given block with a Bio::MAF::ParseContext using the given
chunk_size
as an argument.
653 654 655 656 657 658 659 660 |
# File 'lib/bio/maf/parser.rb', line 653 def with_context(chunk_size) ctx = context(chunk_size) begin yield ctx ensure ctx.f.close end end |
#wrap_block_seq(fun, &blk) ⇒ Object
890 891 892 893 894 895 |
# File 'lib/bio/maf/parser.rb', line 890 def wrap_block_seq(fun, &blk) opts = WRAP_OPTS.find_all { |o| @opts[o] } opts << :sequence_filter if sequence_filter && (! sequence_filter.empty?) LOG.debug { "wrapping #{fun} with #{opts.inspect}" } _wrap(opts, fun, &blk) end |