Source code for kinmatch.utils

import os
import json
from copy import deepcopy
import multiprocessing
import numpy as np
import pandas as pd


from kinmatch import config


def _apply_df(args):
    df, func, kwargs = args
    return df.apply(func, **kwargs)

[문서]def apply_by_multiprocessing(df, func, **kwargs): if 'workers' in kwargs: workers = kwargs.pop('workers') else: workers = multiprocessing.cpu_count() pool = multiprocessing.Pool(processes=workers) index_count = df.shape[0] if index_count < workers: workers = index_count result = pool.map(_apply_df, [(d, func, kwargs) for d in np.array_split(df, workers)]) pool.close() return pd.concat(list(result))
""" def apply_by_multiprocessing(df, func, **kwargs): result = _apply_df((df, func, kwargs)) return result """
[문서]class GenotypeCleaner: def __init__(self, genotype, is_encoded=False): self.genotype = genotype self.encoded_genotype = None self.decoded_genotype = None if is_encoded: self.encoded_genotype = self.genotype else: self.decoded_genotype = self.genotype @staticmethod
[문서] def encode_marker(marker): marker = str(marker) ## this is for mtDNA excel input try: marker = config.MARKER_ALIAS[marker] except KeyError: pass if marker[0] in [str(i) for i in range(10)]: marker = '_{}'.format(marker) if '.' in marker: marker = marker.replace('.', '__') marker = marker.replace(' ', '_') return marker
@staticmethod
[문서] def decode_marker(marker): if marker[0] == '_': marker = marker[1:] if '__' in marker: marker = marker.replace('__', '.') marker = marker.replace('_',' ') return marker
[문서] def encode_allele(self, allele, marker=''): return allele
[문서] def encode_alleles(self, alleles, marker='', null=None): return alleles
[문서] def decode_allele(self, allele, marker=''): return allele
[문서] def decode_alleles(self, alleles, marker='', null=''): return alleles
[문서] def encode(self, null=None, markers=None): #if not self.encoded_genotype: self.encoded_genotype = {self.encode_marker(marker):\ self.encode_alleles(alleles, marker, null)\ for marker, alleles in self.genotype.items()} if markers: return {k:v for k,v in self.encoded_genotypes if k in markers} return self.encoded_genotype
[문서] def decode(self, null='-', markers=None): #if not self.decoded_genotype: self.decoded_genotype = {self.decode_marker(marker):\ self.decode_alleles(alleles, marker, null)\ for marker, alleles in self.genotype.items()} if markers: return {k:v for k,v in self.decoded_genotypes if k in markers} return self.decoded_genotype
[문서]class STRGenotypeCleaner(GenotypeCleaner):
[문서] def encode_allele(self, allele, marker=''): if marker in config.SEX_MARKERS: if allele == 'X' or allele == 'x': allele = 1 elif allele == 'Y' or allele == 'y': allele = 2 else: raise AssertionError( 'SEX marker allele value have to be X or Y') try: result = int(float(allele) * 10) except ValueError: raise AssertionError('Allele value have to be number') return result
[문서] def encode_alleles(self, alleles, marker='', null=None): if not alleles or (type(alleles) != list and pd.isnull(alleles)) or\ (type(alleles) == str and alleles == 'nan'): return null if type(alleles) == str or type(alleles) == float: alleles = str(alleles) if not alleles: return null if ',' in alleles: alleles = [a.strip() for a in alleles.split(',')] elif marker in config.SEX_MARKERS: alleles = [a.strip() for a in alleles] else: alleles = [alleles.strip()] if len(alleles) == 1: a1, a2 = alleles[0], alleles[0] elif len(alleles) == 2: a1, a2 = alleles[0], alleles[1] else: raise AssertionError( 'The alleles number should be 1 or 2.') nas = ('-', '0', 0, '') if a1 in nas: if a2 in nas: return null else: a1 = a2 elif a2 in nas: a2 = a1 return sorted([ self.encode_allele(a1, marker), self.encode_allele(a2, marker), ])
[문서] def decode_allele(self, allele, marker=''): if marker == 'XY': if allele == 10: return 'X' elif allele == 20: return 'Y' return str(allele / 10.).replace('.0', '')
[문서] def decode_alleles(self, alleles, marker='', null='-'): if type(alleles) == list: return ','.join(self.decode_allele(a, marker) for a in alleles) else: return null
[문서]class ASTRGenotypeCleaner(STRGenotypeCleaner):
[문서] def encode(self, null=None, markers=None): result = super(ASTRGenotypeCleaner, self).encode( null=null, markers=markers) for marker in config.IGNORE_MARKERS['A-STR']: try: del(result[marker]) except KeyError: pass return result
@staticmethod
[문서] def decode_marker(marker): marker = STRGenotypeCleaner.decode_marker(marker) if marker == 'XY': marker = 'Amelogenin' return marker
[문서]class YSTRGenotypeCleaner(STRGenotypeCleaner): @staticmethod
[문서] def encode_marker(marker): marker = STRGenotypeCleaner.encode_marker(marker) while marker[:2] in ('B_', 'G_', 'R_', 'Y_'): marker = marker[2:] try: marker = config.MARKER_ALIAS[marker] except KeyError: pass return marker
[문서] def decode_alleles(self, alleles, marker='', null='-'): result = super(YSTRGenotypeCleaner, self).decode_alleles( alleles, marker, null) if result and str(result) != str(null): a1, a2 = result.split(',') if a1 == a2: return a1 return result
[문서] def encode(self, null=None, markers=None): result = super(YSTRGenotypeCleaner, self).encode( null=null, markers=markers) for marker in config.IGNORE_MARKERS['Y-STR']: try: del(result[marker]) except KeyError: pass return result
[문서]class MtdnaGenotypeCleaner(GenotypeCleaner): def encode(self, null=None, markers=None): if not self.encoded_genotype: self.encoded_genotype = {self.encode_marker(marker):\ self.encode_alleles(alleles, marker, null)\ for marker, alleles in self.genotype.items() if alleles and (alleles is not None) and not pd.isnull(alleles)} if markers: return {k:v for k,v in self.encoded_genotypes if k in markers} return self.encoded_genotype @staticmethod
[문서] def decode_marker(marker): marker = GenotypeCleaner.decode_marker(marker) marker = marker.replace('.0','') return marker
[문서] def encode(self, null=None, markers=None): result = super(MtdnaGenotypeCleaner, self).encode( null=null, markers=markers) for marker in config.IGNORE_MARKERS['mtDNA']: try: del(result[marker]) except KeyError: pass return result
cleaner = { 'A-STR': ASTRGenotypeCleaner, 'Y-STR': YSTRGenotypeCleaner, 'mtDNA': MtdnaGenotypeCleaner, }
[문서]def encode_genotype(genotype, type_, null=None): gc = cleaner[type_](genotype) return gc.encode(null=null)
[문서]def decode_genotype(genotype, type_, null='-'): gc = cleaner[type_](genotype) return gc.decode(null=null)
[문서]def encode_marker(marker, type_): return cleaner[type_].encode_marker(marker)
[문서]def decode_marker(marker, type_): return cleaner[type_].decode_marker(marker)
[문서]def encode_alleles(alleles, type_, marker=''): gc = cleaner[type_](None) return gc.encode_alleles(alleles, marker=marker)
[문서]def decode_alleles(alleles, type_, marker='', null=''): gc = cleaner[type_](None) return gc.decode_alleles(alleles, marker=marker, null=null)
[문서]def decode_allele(allele, type_, marker=''): gc = cleaner[type_](None) return gc.decode_allele(allele, marker=marker)
""" ## temporarily holding the code def clean_allele(allele): if allele == 'X' or allele == 'x': allele = 1 elif allele == 'Y' or allele == 'y': allele = 2 result = int(float(allele) * 10) return result def clean_alleles(alleles, null=None): if type(alleles) == str: if ',' in alleles: a1, a2 = [a.strip() for a in alleles.split(',')] elif 'X' in alleles or 'Y' in alleles: a1, a2 = alleles else: a1, a2 = alleles, None else: try: a1, a2 = alleles except: if pd.isnull(alleles): return null a1, a2 = alleles, alleles nas = ('-', '0', 0) if not a1 or a1 in nas: return null if not a2: a2 = a1 return [clean_allele(a1), clean_allele(a2)] def clean_marker(marker_name, type_='All'): marker_name = str(marker_name) ## this is for mtDNA excel input if marker_name[0] in [str(i) for i in range(10)]: marker_name = '_{}'.format(marker_name) if '.' in marker_name: marker_name = marker_name.replace('.', '__') marker_name = marker_name.replace(' ', '_') if type_ == 'A-STR' or type_ == 'All': if marker_name in ['AMEL']: marker_name = 'XY' if type_ == 'Y-STR' or type_ == 'All': while marker_name[:2] in ('B_', 'G_', 'R_', 'Y_'): marker_name = marker_name[2:] if marker_name == 'GATA_H4': marker_name = 'GATAH4__1' return marker_name def clean_str_dict(str_dict, type_, null=None): result = dict() for marker, alleles in str_dict.items(): if type_ == 'A-STR': if marker == 'XY' and alleles == 'X,': alleles = 'XX' result[clean_marker(marker, type_)] =\ clean_alleles(alleles, null=null) elif type_ == 'Y-STR': if type(alleles) == str and ',' in alleles: result[clean_marker(marker,type_)] =\ clean_alleles(alleles, null=null) elif pd.isnull(alleles): result[clean_marker(marker,type_)] = null else: result[clean_marker(marker,type_)] =\ [clean_allele(alleles), clean_allele(alleles)] elif type_ == 'mtDNA': if alleles != '' and pd.notnull(alleles): result[clean_marker(marker, type_)] = alleles return result def display_allele(marker, allele): if marker == 'XY': if allele == 10: return 'X' elif allele == 20: return 'Y' return str(allele / 10.).replace('.0', '') def display_marker(marker): if marker[0] == '_': marker = marker[1:] if '__' in marker: marker = marker.replace('__', '.') marker = marker.replace('_',' ') return marker def display_marker_alleles(marker, alleles, type_='A-STR', null='-'): marker = display_marker(marker) if type_ == 'mtDNA': return marker, alleles if type(alleles) == list: if type_ == 'Y-STR' and alleles[0] == alleles[1]: return marker, display_allele(marker, alleles[0]) else: return marker, ', '.join([display_allele(marker, a) for a in alleles]) else: return marker, null """ KINMATCH_VESION = getattr(config, 'VERSION', '0.1')
[문서]class DataContainer: def __init__(self, status='', message='', items=None, itemCount=0, totalCount=0, totalPageCount=0, format_='json', trans=False, zeroization=False): self.format_ = format_ self.trans = trans self.zeroization = zeroization self.data = dict() self.data['status'] = status self.data['message'] = message self.data['apiVersion'] = KINMATCH_VESION self.data['itemCount'] = itemCount self.data['totalCount'] = totalCount self.data['totalPageCount'] = totalPageCount if items is not None: # self.items = items self.set_items(items) @property def items(self): return self.data['items'] if 'items' in self.data.keys() else None @items.setter def items(self, items): self.set_items(items)
[문서] def set_items(self, items): if items is not None: self.data['itemCount'] = len(items) self.data['items'] = items
[문서] def get_items(self): return self.data['items'] if 'items' in self.data.keys() else None
[문서] def convert(self, format_=''): format_ = format_.lower() or self.data.get('format_', 'json') and 'json' if format_ == 'json' : return self.to_json() elif format_ in ('csv', 'tsv'): return self.to_dsv(format_) elif format_ == 'plain': return self.to_plain() else: return self.data
[문서] def to_json(self): return self.to_json_by_list()
[문서] def to_json_by_dict(self): data = deepcopy(self.data) if not 'items' in data.keys(): data['items'] = {} return data items = data['items'] if items is None: data['items'] = {} elif isinstance(items, dict): for k, v in items.items(): items[k] = self.convert_df2json(v) elif isinstance(items, pd.DataFrame): data['items'] = self.convert_df2json(items) else: print('CHANGE ME TO EXCEPTION IN TO_JSON') return data
[문서] def to_json_by_list(self): if not 'items' in self.data.keys(): self.data['items'] = [] return self.data items = self.data['items'] if items is None: self.data['items'] = [] elif isinstance(items, dict): self.data['items'] = [{k:self.convert_df2json(v)} for k, v in items.items()] elif isinstance(items, pd.DataFrame): self.data['items'] = self.convert_df2json(items) elif isinstance(items, list): self.data['items'] = self.data['items'] else: print('Exception - Converting data') print(type(self.data['items']), self.data['items']) return self.data
[문서] def to_dsv(self, format_): data = deepcopy(self.data) if not 'items' in data.keys(): return None items = data['items'] if items is None: return None elif isinstance(items, dict): if len(items) == 1: return self.convert_df2dsv(items.popitem()[1], format_) elif isinstance(items, pd.DataFrame): return self.convert_df2dsv(items, format_) else: print('CHANGE ME TO EXCEPTION IN TO_DSV') return None return None
[문서] def to_plain(self): return self._print()
def _clean_attr(self, attr): if attr == None: attr = self._get_emptychar() elif isinstance(attr, str): attr = attr elif isinstance(attr, list): attr = ','.join(map(str, attr)) elif np.isnan(attr): attr = self._get_emptychar() else: attr = str(attr) return attr def _get_sep(self, format_=''): format_ = format_.lower() if (format_ and format_ == 'csv') or (self.format_ == 'csv'): sep = ',' elif (format_ and format_ == 'tsv') or (self.format_ == 'tsv'): sep = '\t' else: sep = ',' return sep def _get_emptychar(self): return '0' if self.zeroization else '' def _convert_df2json(self, df): if not isinstance(df, pd.DataFrame): return df df = df if self.trans is False else df.T df = df.astype(object).replace(np.nan, '') return json.loads(df.to_json())
[문서] def convert_df2json(self, df): if isinstance(df, dict): return [df] if not isinstance(df, pd.DataFrame): return df df = df if self.trans is False else df.T empty_char = self._get_emptychar() df = df.astype(object).replace(np.nan, empty_char) items = list() for i, row in enumerate(df.values): l = list() l.append(('identifier', df.index[i])) for j, colname in enumerate(df.columns): l.append((str(colname), str(self._clean_attr(row[j])))) # print(filter(lambda x: x[1], l), end=',') items.append(dict(l)) return items
[문서] def convert_df2dsv(self, df, format_=''): sep = self._get_sep(format_) # pla_items = 'identifier' + sep + sep.join(map(str, df.columns)) + '\n' list_items = list() list_items.append(['identifier'] + list(map(str, df.columns))) if isinstance(df, pd.DataFrame): for i, row in enumerate(df.values): # plain_items += str(df.index[i]) + sep.join(map(lambda x:self._clean_attr(x), row)) + '\n' list_items.append([str(df.index[i])] + list(map(lambda x:self._clean_attr(x), row))) elif isinstance(df, list): for row in df: # plain_items += sep.join(map(lambda x:self._clean_attr(x), row)) + '\n' list_items.append([str(df.index[i])] + list(map(lambda x:self._clean_attr(x), row))) else: pass return list_items
[문서] def raw_print(self, sep=','): print('Status:{}'.format(self.data['status']), end=',') print('Message:{}'.format(self.data['message']), end=',') if 'items' in self.data.keys(): print('Total Count:{}'.format(self.data['totalCount']), end=',') print('Data:', end='') print(self.convert_df2json(self.data['items']))
def _print(self): return '\n'.join(['%s - %s' % (k, v) for k, v in self.data.items()]) def __unicode__(self): return self._print() def __str__(self): return self._print()