#!/usr/bin/python ########################################### # Author: Brian Chan (birdchan@ucdavis.edu) # Supervisor: Alexander Kozik (akozik@atgc.org) # Date: February 27 2004 # Description: # # This script finds mismatches in the sequences # # Notation: [something]* means "something" can appear # many times. {something}* means the same. # # ========================================= # # Input file format: # # { # . : . : . : # seq1 XAAAAAAAAT-TTTTGTTTT-TTTTT # seq2 AAAAA-AAATTTTTTTTTTT-TTTTT # -------------------------- # Contig_1 NAAAAAAAAT-TTTTGTTTT-TTTTT # }* # # ========================================= # # Mismatches are defined as follows: # # This is an exception, not a mismatch # if seq[i] == "X" and contig[i] == "N" # For example, at position 1 # # This is a deletion # if contig[i] != "-" and seq[i] == "-" # For example, at position 6 # # This is an insertion # if contig[i] == "-" and seq[i] != "-" # For example, at position 11 # # This is a substitution # if contig[i] != "-" and seq[i] != "-" and contig[i] != seq[i] # For example, at position 16 # # ========================================== # # Output file format: # # {Contig_i seqName MM_info}* # # MM_info is defined as follows: # "no polymorphism found" | [index:[D|I|S]]* # # For the sample input file, the output would be # Contig1 seq1 no polymorphism found # Contig1 seq2 6:D|11:I|16:S # # ############################################ import sys import string from string import * from string import rstrip import re ############################################ # global seqNameMaxLength = 22 ############################################ class seqClass: def __init__(self, name): self.name = name self.seq = "" def setSeq(self, subSeq): self.seq = subSeq def getName(self): return self.name def getSeq(self): return self.seq def charAt(self, pos): if len(self.seq) > pos: return self.seq[pos] else: return " " # the genes on the right should be " " def print_info(self): str = "" str = self.name num_sp = len(self.name) for i in range( seqNameMaxLength-num_sp ): str = str + " " str = str + self.seq + "\n" print str def output_info(self): output_string(self.name) num_sp = len(self.name) for i in range( seqNameMaxLength-num_sp ): output_string(" ") output_string(self.seq + "\n") #------------------------------ class seqInfo: def __init__(self): self.seqName = "" self.infoStr = "" def setSeqName(self, seqName): self.seqName = seqName def appendInfoStr(self, info): if len(self.infoStr) != 0: # have sth self.infoStr = self.infoStr + "|" + info else: # have nothing self.infoStr = info def getInfoStr(self): if len( self.infoStr ) != 0: return self.infoStr else: return "no polymorphism found" #------------------------------ class seqHeapClass: def __init__(self): self.seqList = [] self.seqNum = 0 self.contig_ID = "" def set_contig_ID(self, line): contig_str, white_spaces = split(line) self.contig_ID = contig_str def get_contig_ID(self): return self.contig_ID def find_seq_index(self, name): for i in range(len(self.seqList)): if self.seqList[i].getName() == name: return i return -1 def readinSeq(self, name, subSeq): # insert a new seq newSeq = seqClass(name) # make a new empty seq newSeq.setSeq( subSeq.upper() ) # put stuff into it self.seqList.append(newSeq) self.seqNum = self.seqNum + 1 def clear(self): self.seqList = [] self.seqNum = 0 def print_info(self): print self.get_contig_ID() for l in self.seqList: l.print_info() def output_info(self): subseq_info_list = [] # list of the seq_info objs for i in range(self.seqNum): # initialization subSeq_info = seqInfo() # subSeq_info.setSeqName( self.seqList[i].getName() ) subseq_info_list.append( subSeq_info ) total_len = len( self.seqList[ self.seqNum-1 ].getSeq() ) contig_index = self.seqNum - 1 for col in range(total_len): # from index [0] to the end col_num = col+1 # bio != CS for row in range( self.seqNum - 1 ): # for each seq con_ch = self.seqList[contig_index].charAt(col) seq_ch = self.seqList[row].charAt(col) # seq_ch == " " ? skip, coz not in range if seq_ch == " ": continue # if seq_ch == "X" and contig_ch == "N", not MM if seq_ch == "X" and con_ch == "N": continue # deletion if con_ch != "-" and seq_ch == "-": num = "%d" % col_num subseq_info_list[row].appendInfoStr(num+":D") # insertion if con_ch == "-" and seq_ch != "-": num = "%d" % col_num subseq_info_list[row].appendInfoStr(num+":I") # substitution if con_ch != "-" and seq_ch != "-" and con_ch != seq_ch: num = "%d" % col_num subseq_info_list[row].appendInfoStr(num+":S") # output info to output file for i in range( self.seqNum-1 ): """ # this debugging part prints out len( consensus_line ) n = len( self.seqList[contig_index].getSeq() ) s = "%d" % n output_string( s + "\t" ) """ output_string( self.get_contig_ID() + "\t" ) output_string( self.seqList[i].getName() + "\t" ) output_string( subseq_info_list[i].getInfoStr() + "\n" ) ############################################ # global debug_01 = 1 # print out line info fp_in = 0 # file pointer for input fp_out = 0 # file pointer for output seqHeap = seqHeapClass() # the obj that stores everything within a contig ############################################ def check_contigs_info(): global fp_in lines = fp_in.readlines() if not lines: print "No input in the input file" sys.exit(0) # if no input, exit i = 0 # the line index, starting from the top # jump to the next contig section i = line_num_string(lines, i, ". :") next_contig_index = line_num_string(lines, i+1, ". :") while i < len(lines)-1: global seqRow global seqHeap seqRow = 0 seqHeap.clear() while i <= next_contig_index: line = get_line(lines, i) if is_seq_data_line(line) == 1: if debug_01 == 1: print "line ", i, " is a seq_data_line" do_seq_data(line) elif is_consensus_line(line) == 1: if debug_01 == 1: print "line ", i, " is a consensus_line" do_consensus(line) elif is_blank_line(line) == 1: if debug_01 == 1: print "line ", i, " is a blank line" i = i + 1 # go to the next line # show what we got #seqHeap.print_info() # output data into output file seqHeap.output_info() # jump to the next contig section i = line_num_string(lines, i-1, ". :") next_contig_index = line_num_string(lines, i+1, ". :") # ----------------------------------------- def get_line(lines, index): line = lines[index] line = rstrip(line) # get rid of \n return line def line_num_string(lines, index, pattern): # stop at the matched line i = index while 1: if i >= len(lines): return len(lines)-1 line = get_line(lines, i) if find(line, pattern) != -1: break i = i + 1 return i def output_string(str): global fp_out fp_out.write(str) def is_x_line(line, x): if find(line, x) != -1: return 1 return 0 def is_seq_data_line(line): if not is_consensus_line(line) and len(line) > 0 and line[0] != " ": return 1 else: return 0 def is_consensus_line(line): return is_x_line(line, "Contig") def is_blank_line(line): if len(line) == 0: return 1 return 0 def do_seq_data(line): # the seq name and the whole seq global seqNameMaxLength seqName = line[0:seqNameMaxLength] seqName = split(seqName) seqName = seqName[0] seq = line[seqNameMaxLength:len(line)] # store the seq global seqHeap seqHeap.readinSeq(seqName, seq) def do_consensus(line): # read consensus line global seqHeap global seqNameMaxLength seqName = line[0:seqNameMaxLength] # the consensus name (contig) seqName = split(seqName) seqName = seqName[0] seq = line[seqNameMaxLength:len(line)] seqHeap.readinSeq(seqName, seq) seqHeap.set_contig_ID(line) ############################################ # main # ask for input # if sys.argv[1] != "": # input_filename = sys.argv[1] # else: s = raw_input("Enter the SOURCE file name: ") input_filename = s s = raw_input("Enter the DESTINATION file name: ") output_filename = s # checking input if input_filename == output_filename: print "input file is the same as output file" sys.exit(0) # open the input and output file print "read data from the input file", input_filename, "..." fp_in = open(input_filename, "rb") fp_out = open(output_filename, "wb") check_contigs_info() fp_in.close() fp_out.close()