Module imputerApi

Expand source code
import sys
import copy
import csv
import warnings
import os

class ImputerApi(object):
    def __init__(self, path_to_file=None, matrix_2D=None, delimiter=",", strategy="mean",headers=True) -> None:
        """
        Constructor
        Return : None
        """
        self.path_to_file = path_to_file
        self.matrix_2D = matrix_2D
        self.delimiter = delimiter
        self.strategy = strategy
        self.data = []
        self.headers = headers
        self.headers_value = []
        self.supported_strategies = ["mean","median","most-frequent","constant"]
        if self.strategy not in self.supported_strategies:
            print(f":ERROR: `{self.strategy}` is not a supported strategy.\nSupported strategies are: `{('`,`'.join(self.supported_strategies))}` .")
            sys.exit(1)
        if self.path_to_file == None and matrix_2D == None:
            print(f":ERROR: Please provide either a csv file or a two dimensional matrix.")
            sys.exit(1)
        if self.path_to_file != None and matrix_2D != None:
            print(f":ERROR: Please provide either a csv file or a two dimensional matrix.")
            sys.exit(1)
        if matrix_2D != None and isinstance(self.matrix_2D,list)==False:
            print(f":ERROR: `matrix_2D` attribute must be a two dimensional matrix.")
            sys.exit(1)
        if self.path_to_file != None:
            self.prepare_data()
        if self.matrix_2D !=None:
            if self.headers == True:
                self.headers_value = self.matrix_2D[0]
                self.data = copy.deepcopy(self.matrix_2D[1:])
            else:
                self.data = copy.deepcopy(self.matrix_2D)

    @staticmethod
    def not_implemented(fn_name):
        """Helper Function

            Parameters:
            fn_name (string): Function Name

            Returns:
            None

        """
        print(f"\n`{fn_name}` is not implemented yet.\n\n")
        raise NotImplementedError
    
    @staticmethod
    def give_me_first(arr):
        """Function to get first element of a list and the rest
        
            Parameters:
            arr (List): Input List

            Returns:
            Tuple: (First Element, Rest of the List)

        """
        # Not exactly pop but loose
        if isinstance(arr,list)==False:
            raise Exception("InvalidType")
        if len(arr) == 0:
            raise Exception("EmptyList")
        new_arr = arr[1:]
        return arr[0], new_arr


    def prepare_data(self):
        data_arr = []
        try:
            with open(self.path_to_file) as csvreader:
                data=csv.reader(csvreader,delimiter=self.delimiter)
                for row in data:
                    data_arr.append([x for x in row])
            csvreader.close()
            if self.headers==True:
                self.headers_value = data_arr[0]
                if '' in self.headers_value:
                    warnings.warn(":WARNING: Header contains blank value.")
                self.data = copy.deepcopy(data_arr[1:])
            else:
                self.data = copy.deepcopy(data_arr)

        except Exception as e:
            print(e)
            print(e.args)
            sys.exit(1)
    
    def transform(self,columns_by_header_name=[],column_indexes=[],row_start=0,row_end=-1,missing_value='',constant=None):
        if row_end==-1:
            row_end = len(self.data)-1
        if isinstance(row_start,int)==False or row_start<0 or row_start>row_end or (float(row_start)-row_start)!=0.0:
            print(f":ERROR: `row_start` must be an integer between 0 and {len(self.data)-1}.")
            sys.exit(1)
        if isinstance(row_end,int)==False or row_end<0 or row_end>len(self.data)-1 or (float(row_end)-row_end)!=0.0:
            print(f":ERROR: `row_end` must be an integer between 0 and {len(self.data)-1}.")
            sys.exit(1)
        if len(columns_by_header_name) == 0 and len(column_indexes) == 0:
            columns_by_header_name = self.headers_value if len(self.headers_value)>0 else []
        
        col_header_indexes = self.transform_sub_1(columns_by_header_name,column_indexes)
        # print(col_header_indexes)
        
        fn_mapping={
            "mean": self.arr_replace_by_mean,
            "median": self.arr_replace_by_median,
            "most-frequent":self.arr_replace_by_most_frequent,
            "constant":self.arr_replace_by_constant
        }
        fn_to_be_called = fn_mapping[self.strategy]

        result=[]
        for index in col_header_indexes:
            temp_array=[]
            for i in range(row_start,row_end+1):
                temp_array.append(self.data[i][index])
            if isinstance(missing_value,list)==True:
                index_arr=[i for i in range(0,len(temp_array)) if temp_array[i] in missing_value]
            else:
                index_arr=[i for i in range(0,len(temp_array)) if temp_array[i] == missing_value]
            if index_arr == []:
                warning_text= f":WARNING: There are no missing value = ` {missing_value} ` in the given range from {row_start} to {row_end} and selected in columns: {col_header_indexes} .\n"
                warnings.warn(warning_text)
            if self.strategy == "constant":
                if constant==None:
                    print(f"\n:ERROR: Parameter `constant` needs to be passed to `transform`.\n")
                    sys.exit(1)
                else:
                    result.append(fn_to_be_called(temp_array,index_arr,missing_value,constant))
            else:
                result.append(fn_to_be_called(temp_array,index_arr,missing_value))
                
        return self.transform_sub_2_put_back(row_start,row_end,col_header_indexes,result)

    def transform_sub_1(self,columns_by_header_name,column_indexes):
        col_header_indexes=[]
        not_found_fr_dbgn=[]
        for i in range(0,len(columns_by_header_name)):
            if columns_by_header_name[i] not in self.headers_value:
                not_found_fr_dbgn.append(columns_by_header_name[i])
            else:
                for j in range(0,len(self.headers_value)):
                    if columns_by_header_name[i]==self.headers_value[j]:
                        col_header_indexes.append(j)
            
        if len(col_header_indexes) == 0 and len(not_found_fr_dbgn)>0:
            print(f"\n:ERROR: Invalid column names: `{'`, `'.join(not_found_fr_dbgn)}`.\n")
            raise Exception("InvalidColumnName")
        if len(col_header_indexes)>0 and len(not_found_fr_dbgn)>0:
            print(f"\n:ERROR: Invalid column names: `{'`, `'.join(not_found_fr_dbgn)}`.\n")
            raise Exception("InvalidColumnName")
        
        if len(col_header_indexes)==len(self.data[0]):
            pass
        elif len(column_indexes)>len(self.data[0]):
            print(f'\n:ERROR: (Number of columns to be selected should be less than or equal to total number of columns in the data(= {len(self.data[0])} ).\n')
            raise Exception("LengthMismatch")
        else:
            for el in column_indexes:
                if isinstance(el,int)==False or el<0 or el >= len(self.data[0]) or float(el)-el!=0.0:
                    print(f"\n:ERROR: Invalid index value: `{el}`. Index must be an integer between 0 and {len(self.data[0])-1}. Total Number of columns in the data = {len(self.data[0])}. \n")
                    raise ValueError
                col_header_indexes.append(el)
        
        col_header_indexes=list(set(col_header_indexes))
        return col_header_indexes

    def transform_sub_2_put_back(self,row_start,row_end,col_header_indexes,result):
        assert(len(col_header_indexes)==len(result))
        data_copy = copy.deepcopy(self.data)
        for j in col_header_indexes:
            arr,new_arr=ImputerApi.give_me_first(result)
            result = copy.deepcopy(new_arr)
            for i in range(row_start,row_end+1): 
                el,rest = ImputerApi.give_me_first(arr)
                arr=rest
                data_copy[i][j] = el

            if new_arr==[]:
                return data_copy
        
    def print_table(self,arr_2D,row_sep=" "):
        assert(isinstance(arr_2D,list))
        assert(len(arr_2D)>0)
        header_dashes_chars_count = len(''.join([str(x) for x in arr_2D[0]])) + len(arr_2D[0])
        if self.headers_value != []:
            if (len(''.join(self.headers_value)) + len(self.headers_value)) > header_dashes_chars_count:
                header_dashes_chars_count = len(''.join(self.headers_value)) + len(self.headers_value)
            print("-"*header_dashes_chars_count)  
            print(row_sep.join(self.headers_value))
        else:
            print('-'*header_dashes_chars_count)
        for row in arr_2D:
            print(row_sep.join([str(x) for x in row]))
        print('-'*header_dashes_chars_count)

    def dump_data_to_csv(self,dst_file_path,data:list,delimiter=',',quotechar='"',override=False,use_header_from_data=False):
        """Function to get mean of a list 
        
            Parameters:
            dst_file_path (String): CSV file name to write to,
            data (List): Matrix to be written,
            delimiter (String): Delimiter to be used in CSV,
            quotechar (Strng): Quote Character to be used while wrting to CSV,
            override (Boolean): Override existing file,
            use_header_from_data (Boolean): Flag whether to use header values from input data

            Returns:
            None

        """
        assert(dst_file_path!='' or dst_file_path!=None)
        if (dst_file_path.split("."))[-1] == dst_file_path:
            dst_file_path = dst_file_path+".csv"
        if (dst_file_path.split("."))[-1] != 'csv':
            print("\n:ERROR: Extension of file must be .csv\n")
            raise Exception("InvalidFileExtension")
        if os.path.exists(dst_file_path):
            if override == False:
                print(f"\n:ERROR: FilePath : `{dst_file_path}` already exists. Use override=True in dump_data_to_csv function. \n")
                sys.exit(1)
            else:
                pass
        try:
            with open(dst_file_path, 'w', newline='') as csvfile:
                csv_writer = csv.writer(csvfile, delimiter=delimiter,quotechar=quotechar, quoting=csv.QUOTE_MINIMAL)
                if use_header_from_data == True:
                    if self.headers_value == []:
                        warnings.warn("\n:WARNING: Original Data File have no header values. Skipping use_header_from_data=True flag\n")
                    else:
                        csv_writer.writerow(self.headers_value)

                for row in data:
                    csv_writer.writerow(row)
            csvfile.close()
        except Exception as e:
            print(e)
            print("\n:ERROR: Error while writing to file.\n")
            sys.exit(1)

        print(f"\nFile Saved: `{dst_file_path}`")

    @staticmethod
    def mean(arr,missing_value=''):
        """Function to get mean of a list 
        
            Parameters:
            arr (List): Input List,
            missing_value (Any): Value to be skipped

            Returns:
            float: Mean of the List

        """
        l = len(arr)
        missing_count=0
        try:
            assert(l > 0)
        except Exception as e:
            print(f":ERROR: Empty List.")
            sys.exit(1)
        sum = 0
        for i in range(l):
            if arr[i] == missing_value or arr[i] in missing_value:
                missing_count = missing_count + 1
                continue
            try:
                sum = sum + float(arr[i])
            except Exception as e:
                print(e)
                print(
                    f":ERROR: Conversion of `{arr[i]}` to float failed at array location `{i}`.")
                print("Strategy `mean` requires values to be float.")
                sys.exit(1)
        return (sum/(l-missing_count))

    @staticmethod
    def median(arr,missing_value=''):
        """Function to get median of a list 
        
            Parameters:
            arr (List): Input List,
            missing_value (Any): Value to be skipped

            Returns:
            float: median of the List

        """
        l = len(arr)
        try:
            assert(l > 0)
        except Exception as e:
            print(f":ERROR: Empty List.")
            sys.exit(1)
        arr_cp=[]
        arr_gen=(x for x in arr)
        for _ in range(l):
            try:
                el=next(arr_gen)
                if el == missing_value or el in missing_value:
                    pass
                else:
                    arr_cp.append(float(el))
            except Exception as e:
                print(e)
                sys.exit(1)
        arr_cp = sorted(arr_cp)
        if len(arr_cp) % 2==1:
            return arr_cp[len(arr_cp)//2]
        else:
            return (arr_cp[len(arr_cp)//2]+arr_cp[len(arr_cp)//2-1])/2
    
    @staticmethod
    def most_frequent(arr,missing_value=''):
        """Function to get most frequent value of a list 
        
            Parameters:
            arr (List): Input List,
            missing_value (Any): Value to be skipped

            Returns:
            any: most frequent value of the List

        """
        dct = {}
        for el in arr:
            if el == missing_value or el in missing_value:
                pass
            else:
                if str(el) in dct.keys():
                    dct[str(el)] = dct[str(el)] + 1
                else:
                    dct[str(el)] = 1
        max_key = ''
        max_val = 0
        for (k,v) in dct.items():
            if v > max_val:
                max_val = v
                max_key = k
        return max_key


    def arr_replace_by_mean(self, arr, index_arr,missing_value=''):
        """Wrapper Function over mean which performs replace operation given indexed array 
        
            Parameters:
            arr (List): Input List,
            index_arr (List:Int):  Indexes of list whose values are to be replaced,
            missing_value (Any): Value to be skipped

            Returns:
            list: Replaced List

        """
        arr_copy = copy.deepcopy(arr)
        mean_ = ImputerApi.mean(arr_copy,missing_value)
        for i in index_arr:
            if isinstance(arr[i],str):
                arr_copy[i] = str(mean_)
            else:
                arr_copy[i] = mean_
        return arr_copy

    def arr_replace_by_median(self, arr, index_arr,missing_value=''):
        """Wrapper Function over median which performs replace operation given indexed array 
        
            Parameters:
            arr (List): Input List,
            index_arr (List:Int):  Indexes of list whose values are to be replaced,
            missing_value (Any): Value to be skipped

            Returns:
            list: Replaced List

        """
        arr_copy = copy.deepcopy(arr)
        median_ = ImputerApi.median(arr_copy,missing_value)
        for i in index_arr:
            if isinstance(arr[i],str):
                arr_copy[i] = str(median_)
            else:
                arr_copy[i] = median_
        return arr_copy

    def arr_replace_by_most_frequent(self, arr, index_arr,missing_value=''):
        """Wrapper Function over most_frequent which performs replace operation given indexed array 
        
            Parameters:
            arr (List): Input List,
            index_arr (List:Int):  Indexes of list whose values are to be replaced,
            missing_value (Any): Value to be skipped

            Returns:
            list: Replaced List

        """
        arr_copy = copy.deepcopy(arr)
        most_frequent_ = ImputerApi.most_frequent(arr_copy,missing_value)
        for i in index_arr:
            if isinstance(arr[i],str):
                arr_copy[i] = str(most_frequent_)
            else:
                arr_copy[i] = most_frequent_
        return arr_copy
    
    def arr_replace_by_constant(self, arr, index_arr,missing_value='',constant=''):
        """Wrapper Function which performs replace operation given indexed array and a constant
        
            Parameters:
            arr (List): Input List,
            index_arr (List:Int):  Indexes of list whose values are to be replaced,
            missing_value (Any): Value to be skipped,
            constant (Any): Value to be replaced with

            Returns:
            list: Replaced List

        """
        arr_copy = copy.deepcopy(arr)
        for i in index_arr:
            if isinstance(arr[i],str):
                arr_copy[i] = str(constant)
            else:
                arr_copy[i] = constant
        return arr_copy

Classes

class ImputerApi (path_to_file=None, matrix_2D=None, delimiter=',', strategy='mean', headers=True)

Constructor Return : None

Expand source code
class ImputerApi(object):
    def __init__(self, path_to_file=None, matrix_2D=None, delimiter=",", strategy="mean",headers=True) -> None:
        """
        Constructor
        Return : None
        """
        self.path_to_file = path_to_file
        self.matrix_2D = matrix_2D
        self.delimiter = delimiter
        self.strategy = strategy
        self.data = []
        self.headers = headers
        self.headers_value = []
        self.supported_strategies = ["mean","median","most-frequent","constant"]
        if self.strategy not in self.supported_strategies:
            print(f":ERROR: `{self.strategy}` is not a supported strategy.\nSupported strategies are: `{('`,`'.join(self.supported_strategies))}` .")
            sys.exit(1)
        if self.path_to_file == None and matrix_2D == None:
            print(f":ERROR: Please provide either a csv file or a two dimensional matrix.")
            sys.exit(1)
        if self.path_to_file != None and matrix_2D != None:
            print(f":ERROR: Please provide either a csv file or a two dimensional matrix.")
            sys.exit(1)
        if matrix_2D != None and isinstance(self.matrix_2D,list)==False:
            print(f":ERROR: `matrix_2D` attribute must be a two dimensional matrix.")
            sys.exit(1)
        if self.path_to_file != None:
            self.prepare_data()
        if self.matrix_2D !=None:
            if self.headers == True:
                self.headers_value = self.matrix_2D[0]
                self.data = copy.deepcopy(self.matrix_2D[1:])
            else:
                self.data = copy.deepcopy(self.matrix_2D)

    @staticmethod
    def not_implemented(fn_name):
        """Helper Function

            Parameters:
            fn_name (string): Function Name

            Returns:
            None

        """
        print(f"\n`{fn_name}` is not implemented yet.\n\n")
        raise NotImplementedError
    
    @staticmethod
    def give_me_first(arr):
        """Function to get first element of a list and the rest
        
            Parameters:
            arr (List): Input List

            Returns:
            Tuple: (First Element, Rest of the List)

        """
        # Not exactly pop but loose
        if isinstance(arr,list)==False:
            raise Exception("InvalidType")
        if len(arr) == 0:
            raise Exception("EmptyList")
        new_arr = arr[1:]
        return arr[0], new_arr


    def prepare_data(self):
        data_arr = []
        try:
            with open(self.path_to_file) as csvreader:
                data=csv.reader(csvreader,delimiter=self.delimiter)
                for row in data:
                    data_arr.append([x for x in row])
            csvreader.close()
            if self.headers==True:
                self.headers_value = data_arr[0]
                if '' in self.headers_value:
                    warnings.warn(":WARNING: Header contains blank value.")
                self.data = copy.deepcopy(data_arr[1:])
            else:
                self.data = copy.deepcopy(data_arr)

        except Exception as e:
            print(e)
            print(e.args)
            sys.exit(1)
    
    def transform(self,columns_by_header_name=[],column_indexes=[],row_start=0,row_end=-1,missing_value='',constant=None):
        if row_end==-1:
            row_end = len(self.data)-1
        if isinstance(row_start,int)==False or row_start<0 or row_start>row_end or (float(row_start)-row_start)!=0.0:
            print(f":ERROR: `row_start` must be an integer between 0 and {len(self.data)-1}.")
            sys.exit(1)
        if isinstance(row_end,int)==False or row_end<0 or row_end>len(self.data)-1 or (float(row_end)-row_end)!=0.0:
            print(f":ERROR: `row_end` must be an integer between 0 and {len(self.data)-1}.")
            sys.exit(1)
        if len(columns_by_header_name) == 0 and len(column_indexes) == 0:
            columns_by_header_name = self.headers_value if len(self.headers_value)>0 else []
        
        col_header_indexes = self.transform_sub_1(columns_by_header_name,column_indexes)
        # print(col_header_indexes)
        
        fn_mapping={
            "mean": self.arr_replace_by_mean,
            "median": self.arr_replace_by_median,
            "most-frequent":self.arr_replace_by_most_frequent,
            "constant":self.arr_replace_by_constant
        }
        fn_to_be_called = fn_mapping[self.strategy]

        result=[]
        for index in col_header_indexes:
            temp_array=[]
            for i in range(row_start,row_end+1):
                temp_array.append(self.data[i][index])
            if isinstance(missing_value,list)==True:
                index_arr=[i for i in range(0,len(temp_array)) if temp_array[i] in missing_value]
            else:
                index_arr=[i for i in range(0,len(temp_array)) if temp_array[i] == missing_value]
            if index_arr == []:
                warning_text= f":WARNING: There are no missing value = ` {missing_value} ` in the given range from {row_start} to {row_end} and selected in columns: {col_header_indexes} .\n"
                warnings.warn(warning_text)
            if self.strategy == "constant":
                if constant==None:
                    print(f"\n:ERROR: Parameter `constant` needs to be passed to `transform`.\n")
                    sys.exit(1)
                else:
                    result.append(fn_to_be_called(temp_array,index_arr,missing_value,constant))
            else:
                result.append(fn_to_be_called(temp_array,index_arr,missing_value))
                
        return self.transform_sub_2_put_back(row_start,row_end,col_header_indexes,result)

    def transform_sub_1(self,columns_by_header_name,column_indexes):
        col_header_indexes=[]
        not_found_fr_dbgn=[]
        for i in range(0,len(columns_by_header_name)):
            if columns_by_header_name[i] not in self.headers_value:
                not_found_fr_dbgn.append(columns_by_header_name[i])
            else:
                for j in range(0,len(self.headers_value)):
                    if columns_by_header_name[i]==self.headers_value[j]:
                        col_header_indexes.append(j)
            
        if len(col_header_indexes) == 0 and len(not_found_fr_dbgn)>0:
            print(f"\n:ERROR: Invalid column names: `{'`, `'.join(not_found_fr_dbgn)}`.\n")
            raise Exception("InvalidColumnName")
        if len(col_header_indexes)>0 and len(not_found_fr_dbgn)>0:
            print(f"\n:ERROR: Invalid column names: `{'`, `'.join(not_found_fr_dbgn)}`.\n")
            raise Exception("InvalidColumnName")
        
        if len(col_header_indexes)==len(self.data[0]):
            pass
        elif len(column_indexes)>len(self.data[0]):
            print(f'\n:ERROR: (Number of columns to be selected should be less than or equal to total number of columns in the data(= {len(self.data[0])} ).\n')
            raise Exception("LengthMismatch")
        else:
            for el in column_indexes:
                if isinstance(el,int)==False or el<0 or el >= len(self.data[0]) or float(el)-el!=0.0:
                    print(f"\n:ERROR: Invalid index value: `{el}`. Index must be an integer between 0 and {len(self.data[0])-1}. Total Number of columns in the data = {len(self.data[0])}. \n")
                    raise ValueError
                col_header_indexes.append(el)
        
        col_header_indexes=list(set(col_header_indexes))
        return col_header_indexes

    def transform_sub_2_put_back(self,row_start,row_end,col_header_indexes,result):
        assert(len(col_header_indexes)==len(result))
        data_copy = copy.deepcopy(self.data)
        for j in col_header_indexes:
            arr,new_arr=ImputerApi.give_me_first(result)
            result = copy.deepcopy(new_arr)
            for i in range(row_start,row_end+1): 
                el,rest = ImputerApi.give_me_first(arr)
                arr=rest
                data_copy[i][j] = el

            if new_arr==[]:
                return data_copy
        
    def print_table(self,arr_2D,row_sep=" "):
        assert(isinstance(arr_2D,list))
        assert(len(arr_2D)>0)
        header_dashes_chars_count = len(''.join([str(x) for x in arr_2D[0]])) + len(arr_2D[0])
        if self.headers_value != []:
            if (len(''.join(self.headers_value)) + len(self.headers_value)) > header_dashes_chars_count:
                header_dashes_chars_count = len(''.join(self.headers_value)) + len(self.headers_value)
            print("-"*header_dashes_chars_count)  
            print(row_sep.join(self.headers_value))
        else:
            print('-'*header_dashes_chars_count)
        for row in arr_2D:
            print(row_sep.join([str(x) for x in row]))
        print('-'*header_dashes_chars_count)

    def dump_data_to_csv(self,dst_file_path,data:list,delimiter=',',quotechar='"',override=False,use_header_from_data=False):
        """Function to get mean of a list 
        
            Parameters:
            dst_file_path (String): CSV file name to write to,
            data (List): Matrix to be written,
            delimiter (String): Delimiter to be used in CSV,
            quotechar (Strng): Quote Character to be used while wrting to CSV,
            override (Boolean): Override existing file,
            use_header_from_data (Boolean): Flag whether to use header values from input data

            Returns:
            None

        """
        assert(dst_file_path!='' or dst_file_path!=None)
        if (dst_file_path.split("."))[-1] == dst_file_path:
            dst_file_path = dst_file_path+".csv"
        if (dst_file_path.split("."))[-1] != 'csv':
            print("\n:ERROR: Extension of file must be .csv\n")
            raise Exception("InvalidFileExtension")
        if os.path.exists(dst_file_path):
            if override == False:
                print(f"\n:ERROR: FilePath : `{dst_file_path}` already exists. Use override=True in dump_data_to_csv function. \n")
                sys.exit(1)
            else:
                pass
        try:
            with open(dst_file_path, 'w', newline='') as csvfile:
                csv_writer = csv.writer(csvfile, delimiter=delimiter,quotechar=quotechar, quoting=csv.QUOTE_MINIMAL)
                if use_header_from_data == True:
                    if self.headers_value == []:
                        warnings.warn("\n:WARNING: Original Data File have no header values. Skipping use_header_from_data=True flag\n")
                    else:
                        csv_writer.writerow(self.headers_value)

                for row in data:
                    csv_writer.writerow(row)
            csvfile.close()
        except Exception as e:
            print(e)
            print("\n:ERROR: Error while writing to file.\n")
            sys.exit(1)

        print(f"\nFile Saved: `{dst_file_path}`")

    @staticmethod
    def mean(arr,missing_value=''):
        """Function to get mean of a list 
        
            Parameters:
            arr (List): Input List,
            missing_value (Any): Value to be skipped

            Returns:
            float: Mean of the List

        """
        l = len(arr)
        missing_count=0
        try:
            assert(l > 0)
        except Exception as e:
            print(f":ERROR: Empty List.")
            sys.exit(1)
        sum = 0
        for i in range(l):
            if arr[i] == missing_value or arr[i] in missing_value:
                missing_count = missing_count + 1
                continue
            try:
                sum = sum + float(arr[i])
            except Exception as e:
                print(e)
                print(
                    f":ERROR: Conversion of `{arr[i]}` to float failed at array location `{i}`.")
                print("Strategy `mean` requires values to be float.")
                sys.exit(1)
        return (sum/(l-missing_count))

    @staticmethod
    def median(arr,missing_value=''):
        """Function to get median of a list 
        
            Parameters:
            arr (List): Input List,
            missing_value (Any): Value to be skipped

            Returns:
            float: median of the List

        """
        l = len(arr)
        try:
            assert(l > 0)
        except Exception as e:
            print(f":ERROR: Empty List.")
            sys.exit(1)
        arr_cp=[]
        arr_gen=(x for x in arr)
        for _ in range(l):
            try:
                el=next(arr_gen)
                if el == missing_value or el in missing_value:
                    pass
                else:
                    arr_cp.append(float(el))
            except Exception as e:
                print(e)
                sys.exit(1)
        arr_cp = sorted(arr_cp)
        if len(arr_cp) % 2==1:
            return arr_cp[len(arr_cp)//2]
        else:
            return (arr_cp[len(arr_cp)//2]+arr_cp[len(arr_cp)//2-1])/2
    
    @staticmethod
    def most_frequent(arr,missing_value=''):
        """Function to get most frequent value of a list 
        
            Parameters:
            arr (List): Input List,
            missing_value (Any): Value to be skipped

            Returns:
            any: most frequent value of the List

        """
        dct = {}
        for el in arr:
            if el == missing_value or el in missing_value:
                pass
            else:
                if str(el) in dct.keys():
                    dct[str(el)] = dct[str(el)] + 1
                else:
                    dct[str(el)] = 1
        max_key = ''
        max_val = 0
        for (k,v) in dct.items():
            if v > max_val:
                max_val = v
                max_key = k
        return max_key


    def arr_replace_by_mean(self, arr, index_arr,missing_value=''):
        """Wrapper Function over mean which performs replace operation given indexed array 
        
            Parameters:
            arr (List): Input List,
            index_arr (List:Int):  Indexes of list whose values are to be replaced,
            missing_value (Any): Value to be skipped

            Returns:
            list: Replaced List

        """
        arr_copy = copy.deepcopy(arr)
        mean_ = ImputerApi.mean(arr_copy,missing_value)
        for i in index_arr:
            if isinstance(arr[i],str):
                arr_copy[i] = str(mean_)
            else:
                arr_copy[i] = mean_
        return arr_copy

    def arr_replace_by_median(self, arr, index_arr,missing_value=''):
        """Wrapper Function over median which performs replace operation given indexed array 
        
            Parameters:
            arr (List): Input List,
            index_arr (List:Int):  Indexes of list whose values are to be replaced,
            missing_value (Any): Value to be skipped

            Returns:
            list: Replaced List

        """
        arr_copy = copy.deepcopy(arr)
        median_ = ImputerApi.median(arr_copy,missing_value)
        for i in index_arr:
            if isinstance(arr[i],str):
                arr_copy[i] = str(median_)
            else:
                arr_copy[i] = median_
        return arr_copy

    def arr_replace_by_most_frequent(self, arr, index_arr,missing_value=''):
        """Wrapper Function over most_frequent which performs replace operation given indexed array 
        
            Parameters:
            arr (List): Input List,
            index_arr (List:Int):  Indexes of list whose values are to be replaced,
            missing_value (Any): Value to be skipped

            Returns:
            list: Replaced List

        """
        arr_copy = copy.deepcopy(arr)
        most_frequent_ = ImputerApi.most_frequent(arr_copy,missing_value)
        for i in index_arr:
            if isinstance(arr[i],str):
                arr_copy[i] = str(most_frequent_)
            else:
                arr_copy[i] = most_frequent_
        return arr_copy
    
    def arr_replace_by_constant(self, arr, index_arr,missing_value='',constant=''):
        """Wrapper Function which performs replace operation given indexed array and a constant
        
            Parameters:
            arr (List): Input List,
            index_arr (List:Int):  Indexes of list whose values are to be replaced,
            missing_value (Any): Value to be skipped,
            constant (Any): Value to be replaced with

            Returns:
            list: Replaced List

        """
        arr_copy = copy.deepcopy(arr)
        for i in index_arr:
            if isinstance(arr[i],str):
                arr_copy[i] = str(constant)
            else:
                arr_copy[i] = constant
        return arr_copy

Static methods

def give_me_first(arr)

Function to get first element of a list and the rest

Parameters: arr (List): Input List

Returns: Tuple: (First Element, Rest of the List)

Expand source code
@staticmethod
def give_me_first(arr):
    """Function to get first element of a list and the rest
    
        Parameters:
        arr (List): Input List

        Returns:
        Tuple: (First Element, Rest of the List)

    """
    # Not exactly pop but loose
    if isinstance(arr,list)==False:
        raise Exception("InvalidType")
    if len(arr) == 0:
        raise Exception("EmptyList")
    new_arr = arr[1:]
    return arr[0], new_arr
def mean(arr, missing_value='')

Function to get mean of a list

Parameters: arr (List): Input List, missing_value (Any): Value to be skipped

Returns: float: Mean of the List

Expand source code
@staticmethod
def mean(arr,missing_value=''):
    """Function to get mean of a list 
    
        Parameters:
        arr (List): Input List,
        missing_value (Any): Value to be skipped

        Returns:
        float: Mean of the List

    """
    l = len(arr)
    missing_count=0
    try:
        assert(l > 0)
    except Exception as e:
        print(f":ERROR: Empty List.")
        sys.exit(1)
    sum = 0
    for i in range(l):
        if arr[i] == missing_value or arr[i] in missing_value:
            missing_count = missing_count + 1
            continue
        try:
            sum = sum + float(arr[i])
        except Exception as e:
            print(e)
            print(
                f":ERROR: Conversion of `{arr[i]}` to float failed at array location `{i}`.")
            print("Strategy `mean` requires values to be float.")
            sys.exit(1)
    return (sum/(l-missing_count))
def median(arr, missing_value='')

Function to get median of a list

Parameters: arr (List): Input List, missing_value (Any): Value to be skipped

Returns: float: median of the List

Expand source code
@staticmethod
def median(arr,missing_value=''):
    """Function to get median of a list 
    
        Parameters:
        arr (List): Input List,
        missing_value (Any): Value to be skipped

        Returns:
        float: median of the List

    """
    l = len(arr)
    try:
        assert(l > 0)
    except Exception as e:
        print(f":ERROR: Empty List.")
        sys.exit(1)
    arr_cp=[]
    arr_gen=(x for x in arr)
    for _ in range(l):
        try:
            el=next(arr_gen)
            if el == missing_value or el in missing_value:
                pass
            else:
                arr_cp.append(float(el))
        except Exception as e:
            print(e)
            sys.exit(1)
    arr_cp = sorted(arr_cp)
    if len(arr_cp) % 2==1:
        return arr_cp[len(arr_cp)//2]
    else:
        return (arr_cp[len(arr_cp)//2]+arr_cp[len(arr_cp)//2-1])/2
def most_frequent(arr, missing_value='')

Function to get most frequent value of a list

Parameters: arr (List): Input List, missing_value (Any): Value to be skipped

Returns: any: most frequent value of the List

Expand source code
@staticmethod
def most_frequent(arr,missing_value=''):
    """Function to get most frequent value of a list 
    
        Parameters:
        arr (List): Input List,
        missing_value (Any): Value to be skipped

        Returns:
        any: most frequent value of the List

    """
    dct = {}
    for el in arr:
        if el == missing_value or el in missing_value:
            pass
        else:
            if str(el) in dct.keys():
                dct[str(el)] = dct[str(el)] + 1
            else:
                dct[str(el)] = 1
    max_key = ''
    max_val = 0
    for (k,v) in dct.items():
        if v > max_val:
            max_val = v
            max_key = k
    return max_key
def not_implemented(fn_name)

Helper Function

Parameters: fn_name (string): Function Name

Returns: None

Expand source code
@staticmethod
def not_implemented(fn_name):
    """Helper Function

        Parameters:
        fn_name (string): Function Name

        Returns:
        None

    """
    print(f"\n`{fn_name}` is not implemented yet.\n\n")
    raise NotImplementedError

Methods

def arr_replace_by_constant(self, arr, index_arr, missing_value='', constant='')

Wrapper Function which performs replace operation given indexed array and a constant

Parameters: arr (List): Input List, index_arr (List:Int): Indexes of list whose values are to be replaced, missing_value (Any): Value to be skipped, constant (Any): Value to be replaced with

Returns: list: Replaced List

Expand source code
def arr_replace_by_constant(self, arr, index_arr,missing_value='',constant=''):
    """Wrapper Function which performs replace operation given indexed array and a constant
    
        Parameters:
        arr (List): Input List,
        index_arr (List:Int):  Indexes of list whose values are to be replaced,
        missing_value (Any): Value to be skipped,
        constant (Any): Value to be replaced with

        Returns:
        list: Replaced List

    """
    arr_copy = copy.deepcopy(arr)
    for i in index_arr:
        if isinstance(arr[i],str):
            arr_copy[i] = str(constant)
        else:
            arr_copy[i] = constant
    return arr_copy
def arr_replace_by_mean(self, arr, index_arr, missing_value='')

Wrapper Function over mean which performs replace operation given indexed array

Parameters: arr (List): Input List, index_arr (List:Int): Indexes of list whose values are to be replaced, missing_value (Any): Value to be skipped

Returns: list: Replaced List

Expand source code
def arr_replace_by_mean(self, arr, index_arr,missing_value=''):
    """Wrapper Function over mean which performs replace operation given indexed array 
    
        Parameters:
        arr (List): Input List,
        index_arr (List:Int):  Indexes of list whose values are to be replaced,
        missing_value (Any): Value to be skipped

        Returns:
        list: Replaced List

    """
    arr_copy = copy.deepcopy(arr)
    mean_ = ImputerApi.mean(arr_copy,missing_value)
    for i in index_arr:
        if isinstance(arr[i],str):
            arr_copy[i] = str(mean_)
        else:
            arr_copy[i] = mean_
    return arr_copy
def arr_replace_by_median(self, arr, index_arr, missing_value='')

Wrapper Function over median which performs replace operation given indexed array

Parameters: arr (List): Input List, index_arr (List:Int): Indexes of list whose values are to be replaced, missing_value (Any): Value to be skipped

Returns: list: Replaced List

Expand source code
def arr_replace_by_median(self, arr, index_arr,missing_value=''):
    """Wrapper Function over median which performs replace operation given indexed array 
    
        Parameters:
        arr (List): Input List,
        index_arr (List:Int):  Indexes of list whose values are to be replaced,
        missing_value (Any): Value to be skipped

        Returns:
        list: Replaced List

    """
    arr_copy = copy.deepcopy(arr)
    median_ = ImputerApi.median(arr_copy,missing_value)
    for i in index_arr:
        if isinstance(arr[i],str):
            arr_copy[i] = str(median_)
        else:
            arr_copy[i] = median_
    return arr_copy
def arr_replace_by_most_frequent(self, arr, index_arr, missing_value='')

Wrapper Function over most_frequent which performs replace operation given indexed array

Parameters: arr (List): Input List, index_arr (List:Int): Indexes of list whose values are to be replaced, missing_value (Any): Value to be skipped

Returns: list: Replaced List

Expand source code
def arr_replace_by_most_frequent(self, arr, index_arr,missing_value=''):
    """Wrapper Function over most_frequent which performs replace operation given indexed array 
    
        Parameters:
        arr (List): Input List,
        index_arr (List:Int):  Indexes of list whose values are to be replaced,
        missing_value (Any): Value to be skipped

        Returns:
        list: Replaced List

    """
    arr_copy = copy.deepcopy(arr)
    most_frequent_ = ImputerApi.most_frequent(arr_copy,missing_value)
    for i in index_arr:
        if isinstance(arr[i],str):
            arr_copy[i] = str(most_frequent_)
        else:
            arr_copy[i] = most_frequent_
    return arr_copy
def dump_data_to_csv(self, dst_file_path, data: list, delimiter=',', quotechar='"', override=False, use_header_from_data=False)

Function to get mean of a list

Parameters: dst_file_path (String): CSV file name to write to, data (List): Matrix to be written, delimiter (String): Delimiter to be used in CSV, quotechar (Strng): Quote Character to be used while wrting to CSV, override (Boolean): Override existing file, use_header_from_data (Boolean): Flag whether to use header values from input data

Returns: None

Expand source code
def dump_data_to_csv(self,dst_file_path,data:list,delimiter=',',quotechar='"',override=False,use_header_from_data=False):
    """Function to get mean of a list 
    
        Parameters:
        dst_file_path (String): CSV file name to write to,
        data (List): Matrix to be written,
        delimiter (String): Delimiter to be used in CSV,
        quotechar (Strng): Quote Character to be used while wrting to CSV,
        override (Boolean): Override existing file,
        use_header_from_data (Boolean): Flag whether to use header values from input data

        Returns:
        None

    """
    assert(dst_file_path!='' or dst_file_path!=None)
    if (dst_file_path.split("."))[-1] == dst_file_path:
        dst_file_path = dst_file_path+".csv"
    if (dst_file_path.split("."))[-1] != 'csv':
        print("\n:ERROR: Extension of file must be .csv\n")
        raise Exception("InvalidFileExtension")
    if os.path.exists(dst_file_path):
        if override == False:
            print(f"\n:ERROR: FilePath : `{dst_file_path}` already exists. Use override=True in dump_data_to_csv function. \n")
            sys.exit(1)
        else:
            pass
    try:
        with open(dst_file_path, 'w', newline='') as csvfile:
            csv_writer = csv.writer(csvfile, delimiter=delimiter,quotechar=quotechar, quoting=csv.QUOTE_MINIMAL)
            if use_header_from_data == True:
                if self.headers_value == []:
                    warnings.warn("\n:WARNING: Original Data File have no header values. Skipping use_header_from_data=True flag\n")
                else:
                    csv_writer.writerow(self.headers_value)

            for row in data:
                csv_writer.writerow(row)
        csvfile.close()
    except Exception as e:
        print(e)
        print("\n:ERROR: Error while writing to file.\n")
        sys.exit(1)

    print(f"\nFile Saved: `{dst_file_path}`")
def prepare_data(self)
Expand source code
def prepare_data(self):
    data_arr = []
    try:
        with open(self.path_to_file) as csvreader:
            data=csv.reader(csvreader,delimiter=self.delimiter)
            for row in data:
                data_arr.append([x for x in row])
        csvreader.close()
        if self.headers==True:
            self.headers_value = data_arr[0]
            if '' in self.headers_value:
                warnings.warn(":WARNING: Header contains blank value.")
            self.data = copy.deepcopy(data_arr[1:])
        else:
            self.data = copy.deepcopy(data_arr)

    except Exception as e:
        print(e)
        print(e.args)
        sys.exit(1)
def print_table(self, arr_2D, row_sep=' ')
Expand source code
def print_table(self,arr_2D,row_sep=" "):
    assert(isinstance(arr_2D,list))
    assert(len(arr_2D)>0)
    header_dashes_chars_count = len(''.join([str(x) for x in arr_2D[0]])) + len(arr_2D[0])
    if self.headers_value != []:
        if (len(''.join(self.headers_value)) + len(self.headers_value)) > header_dashes_chars_count:
            header_dashes_chars_count = len(''.join(self.headers_value)) + len(self.headers_value)
        print("-"*header_dashes_chars_count)  
        print(row_sep.join(self.headers_value))
    else:
        print('-'*header_dashes_chars_count)
    for row in arr_2D:
        print(row_sep.join([str(x) for x in row]))
    print('-'*header_dashes_chars_count)
def transform(self, columns_by_header_name=[], column_indexes=[], row_start=0, row_end=-1, missing_value='', constant=None)
Expand source code
def transform(self,columns_by_header_name=[],column_indexes=[],row_start=0,row_end=-1,missing_value='',constant=None):
    if row_end==-1:
        row_end = len(self.data)-1
    if isinstance(row_start,int)==False or row_start<0 or row_start>row_end or (float(row_start)-row_start)!=0.0:
        print(f":ERROR: `row_start` must be an integer between 0 and {len(self.data)-1}.")
        sys.exit(1)
    if isinstance(row_end,int)==False or row_end<0 or row_end>len(self.data)-1 or (float(row_end)-row_end)!=0.0:
        print(f":ERROR: `row_end` must be an integer between 0 and {len(self.data)-1}.")
        sys.exit(1)
    if len(columns_by_header_name) == 0 and len(column_indexes) == 0:
        columns_by_header_name = self.headers_value if len(self.headers_value)>0 else []
    
    col_header_indexes = self.transform_sub_1(columns_by_header_name,column_indexes)
    # print(col_header_indexes)
    
    fn_mapping={
        "mean": self.arr_replace_by_mean,
        "median": self.arr_replace_by_median,
        "most-frequent":self.arr_replace_by_most_frequent,
        "constant":self.arr_replace_by_constant
    }
    fn_to_be_called = fn_mapping[self.strategy]

    result=[]
    for index in col_header_indexes:
        temp_array=[]
        for i in range(row_start,row_end+1):
            temp_array.append(self.data[i][index])
        if isinstance(missing_value,list)==True:
            index_arr=[i for i in range(0,len(temp_array)) if temp_array[i] in missing_value]
        else:
            index_arr=[i for i in range(0,len(temp_array)) if temp_array[i] == missing_value]
        if index_arr == []:
            warning_text= f":WARNING: There are no missing value = ` {missing_value} ` in the given range from {row_start} to {row_end} and selected in columns: {col_header_indexes} .\n"
            warnings.warn(warning_text)
        if self.strategy == "constant":
            if constant==None:
                print(f"\n:ERROR: Parameter `constant` needs to be passed to `transform`.\n")
                sys.exit(1)
            else:
                result.append(fn_to_be_called(temp_array,index_arr,missing_value,constant))
        else:
            result.append(fn_to_be_called(temp_array,index_arr,missing_value))
            
    return self.transform_sub_2_put_back(row_start,row_end,col_header_indexes,result)
def transform_sub_1(self, columns_by_header_name, column_indexes)
Expand source code
def transform_sub_1(self,columns_by_header_name,column_indexes):
    col_header_indexes=[]
    not_found_fr_dbgn=[]
    for i in range(0,len(columns_by_header_name)):
        if columns_by_header_name[i] not in self.headers_value:
            not_found_fr_dbgn.append(columns_by_header_name[i])
        else:
            for j in range(0,len(self.headers_value)):
                if columns_by_header_name[i]==self.headers_value[j]:
                    col_header_indexes.append(j)
        
    if len(col_header_indexes) == 0 and len(not_found_fr_dbgn)>0:
        print(f"\n:ERROR: Invalid column names: `{'`, `'.join(not_found_fr_dbgn)}`.\n")
        raise Exception("InvalidColumnName")
    if len(col_header_indexes)>0 and len(not_found_fr_dbgn)>0:
        print(f"\n:ERROR: Invalid column names: `{'`, `'.join(not_found_fr_dbgn)}`.\n")
        raise Exception("InvalidColumnName")
    
    if len(col_header_indexes)==len(self.data[0]):
        pass
    elif len(column_indexes)>len(self.data[0]):
        print(f'\n:ERROR: (Number of columns to be selected should be less than or equal to total number of columns in the data(= {len(self.data[0])} ).\n')
        raise Exception("LengthMismatch")
    else:
        for el in column_indexes:
            if isinstance(el,int)==False or el<0 or el >= len(self.data[0]) or float(el)-el!=0.0:
                print(f"\n:ERROR: Invalid index value: `{el}`. Index must be an integer between 0 and {len(self.data[0])-1}. Total Number of columns in the data = {len(self.data[0])}. \n")
                raise ValueError
            col_header_indexes.append(el)
    
    col_header_indexes=list(set(col_header_indexes))
    return col_header_indexes
def transform_sub_2_put_back(self, row_start, row_end, col_header_indexes, result)
Expand source code
def transform_sub_2_put_back(self,row_start,row_end,col_header_indexes,result):
    assert(len(col_header_indexes)==len(result))
    data_copy = copy.deepcopy(self.data)
    for j in col_header_indexes:
        arr,new_arr=ImputerApi.give_me_first(result)
        result = copy.deepcopy(new_arr)
        for i in range(row_start,row_end+1): 
            el,rest = ImputerApi.give_me_first(arr)
            arr=rest
            data_copy[i][j] = el

        if new_arr==[]:
            return data_copy