Source code for xb.calculating

import numpy as np
import pandas as pd
import scanpy as sc
import seaborn as sns
import matplotlib.pyplot as plt
import gzip
import shutil
import os.path
from scipy.io import mmread
import tifffile as tf
from scipy.spatial import ConvexHull, convex_hull_plot_2d
from anndata import AnnData
import json
from sklearn.metrics import mutual_info_score
from sklearn.metrics import fowlkes_mallows_score
import math
from sklearn.metrics import normalized_mutual_info_score
from shapely.geometry import Point, Polygon

[docs]def dist_nuc(reads_ctdsub): """ Compute the median distance to the nuclei the edges of each cell, for all cells profiled Args: reads_ctdsub (DataFrame): Dataframe containing the information of the transcripts profiled, incuding their location in 'x_location' and 'y_location', as well as the cell they are assigned to, in 'cell_id'. results: median_dist(float): Median distance of cell edges for all cells profiled. """ from scipy.spatial import ConvexHull, convex_hull_plot_2d allds=[] for g,n in reads_ctdsub.groupby('cell_id'): try: hull = ConvexHull(np.array(n.loc[:,['x_location','y_location']])) allds.append(np.mean(n.iloc[hull.vertices]['distance'])) except: print() median_dist=np.median(allds) return median_dist
[docs]def hex_to_rgb(value): """ Transform hex to rgb Args: value (str): hex code to be transform (i.e '#h4a4a2'). results: rgb_value(tuple): Rgb value. """ value = value.lstrip('#') lv = len(value) rgb_value=tuple(int(value[i:i + lv // 3], 16) for i in range(0, lv, lv // 3)) return rgb_value
[docs]def distance_calc (x1,y1,x2,y2): """ Calculate distance between two points Args: x1(float): x coordinate of the first point. y1(float): y coordinate of the first point. x2(float): x coordinate of the second point. y2(float): y coordinate of the second point. results: distance (float): distance between the two points. """ distance = math.sqrt( ((x1-x2)**2)+((y1-y2)**2) ) return(distance)
[docs]def dispersion(reads_original,adata1): """ Calculate the distance between each read and its assigned cell Args: reads_original(DataFrame): information of all profiled reads. adata1(AnnData): object with the expression and metadata of cells profiled, including spatial position. results: reads_assigned (DataFrame): information of all profiled reads, includinf distance to its closest cell. """ reads_assigned=reads_original cells_metadata=adata1.obs cells_metadata_filt=cells_metadata.loc[cells_metadata['cell_id'].isin(reads_assigned['cell_id']),:] reads_assigned=reads_assigned[reads_assigned['cell_id'].isin(cells_metadata_filt['cell_id'])] dictx=dict(zip(cells_metadata_filt['cell_id'],cells_metadata_filt['x_centroid'])) dicty=dict(zip(cells_metadata_filt['cell_id'],cells_metadata_filt['y_centroid'])) reads_assigned['x_cell']=list(reads_assigned['cell_id'].map(dictx)) reads_assigned['y_cell']=list(reads_assigned['cell_id'].map(dicty)) reads_assigned['distance']=list(np.sqrt(((reads_assigned['x_location']-reads_assigned['x_cell'])**2)+((reads_assigned['y_location']-reads_assigned['y_cell'])**2))) return reads_assigned
[docs]def entropy(clustering): """ Compute entropy Args: clustering (list): list of clusters assigned to cells. results: entropy_value(float): entropy value computed. """ _, counts = np.unique(clustering, return_counts=True) proportions = counts / len(clustering) entropy_value=-np.sum(proportions * np.log(proportions)) return entropy_value
[docs]def compute_vi(ground_truth, predicted): """ Compute variation of information for comparing two different clusterings Args: ground_truth (list): list of reference clusters given to cells profiled. predicted (list): list of predicted/computed clusters for cells profiled. results: vi_score(float): variation of information. """ mi = mutual_info_score(ground_truth, predicted) h_gt = entropy(ground_truth) h_pred = entropy(predicted) vi_score = h_gt + h_pred - 2 * mi return vi_score
[docs]def compute_fmi(ground_truth, predicted): """ Compute fowlkes mallows index for two different clusterings Args: ground_truth (list): list of reference clusters given to cells profiled. predicted (list): list of predicted/computed clusters for cells profiled. results: fmi_score(float): fowlkes mallows index. """ fmi_score = fowlkes_mallows_score(ground_truth, predicted) return fmi_score
[docs]def compute_nmi(ground_truth, predicted): """ Compute normalized mutual information score for two different clusterings Args: ground_truth (list): list of reference clusters given to cells profiled. predicted (list): list of predicted/computed clusters for cells profiled. results: nmi_score(float): normalized mutual information. """ nmi_score = normalized_mutual_info_score(ground_truth, predicted) return nmi_score
[docs]def domainassign(plsin,adatadom): """ Assign cells to domains based on predefined polygons Args: plsin (DataFrame): Information of polygons defining domains. adatadom (AnnData): cells profiled spatially in an AnnData object and with information of their spatial location in ['x_centroid'] and ['y_centroid']. results: None. """ adatadom.obs['region_annotation']='None' plt.figure() for sel in plsin['region_annotation'].unique(): plsub=plsin[plsin['region_annotation']==sel] if plsub.shape[0]>2: coord = np.array(plsub[['y','x']]).tolist() coord.append(coord[0]) poli=Polygon(coord) xs, ys = zip(*coord) #create lists of x and y values plt.plot(xs,ys,color='black') plt.title(sel) for n in adatadom.obs.index: pnt=Point(adatadom.obs.loc[n,'y_centroid'],adatadom.obs.loc[n,'x_centroid']) if pnt.within(poli)==True: adatadom.obs.loc[n,'region_annotation']=sel plt.scatter(adatadom.obs['y_centroid'],adatadom.obs['x_centroid'],s=0.5) plt.show()
[docs]def negative_marker_purity_coexpression(adata_sp: AnnData, adata_sc: AnnData, key: str='celltype', pipeline_output: bool=True,minexp:float =0.0): """ Negative marker purity aims to measure read leakeage between cells in spatial datasets. For this, we calculate the increase in reads assigned in spatial datasets to pairs of genes-celltyes with no/very low expression in scRNAseq Args: adata_sp : AnnData; Annotated ``AnnData`` object with counts from spatial data. adata_sc : AnnData; Annotated ``AnnData`` object with counts scRNAseq data. key : str; Celltype key in adata_sp.obs and adata_sc.obs. pipeline_output : float, optional; Boolean for whether to use the function in the pipeline or not. returns: negative marker purity : float; Increase in proportion of reads assigned in spatial data to pairs of genes-celltyes with no/very low expression in scRNAseq. """ # Set threshold parameters min_number_cells=10 #minimum number of cells belonging to a cluster to consider it in the analysis minimum_exp=0.05 #maximum relative expression allowed in a gene in a cluster to consider the gene-celltype pair the analysis # Subset adata_sc to genes of spatial data adata_sp = adata_sp[:,adata_sp.var_names.isin(adata_sc.var_names)] adata_sc = adata_sc[:,adata_sp.var_names] # TMP fix for sparse matrices, ideally we don't convert, and instead have calculations for sparse/non-sparse #for a in [adata_sc, adata_sp]: # if issparse(a.X): # a.X = a.X.toarray() print(adata_sp.shape) print(adata_sc.shape) print(adata_sc.var.index) # Get mean expression per cell type try: exp_sc=pd.DataFrame(adata_sc.X.todense(),columns=adata_sc.var_names) except: exp_sc=pd.DataFrame(adata_sc.X,columns=adata_sc.var_names) try: exp_sp = pd.DataFrame(adata_sp.X.todense(),columns=adata_sp.var_names) except: exp_sp = pd.DataFrame(adata_sp.X,columns=adata_sp.var_names) print(exp_sc.shape) #exp_sc['celltype'] = list(adata_sc.obs[key]) #exp_sp['celltype'] = list(adata_sp.obs[key]) # instead of measuring the purity by cell type, we check it based on the proportion of posive cells #mean_celltype_sc = exp_sc.groupby('celltype').mean() #mean_celltype_sp = exp_sp.groupby('celltype').mean() mean_celltype_sc=coexpression_calculation(exp_sc,min_exp=minexp) mean_celltype_sp=coexpression_calculation(exp_sp,min_exp=minexp) # Get mean expressions relative to mean over cell types (this will be used for filtering based on minimum_exp) mean_ct_sc_rel = mean_celltype_sc#.div(mean_celltype_sc.mean(axis=0),axis=1) mean_ct_sp_rel = mean_celltype_sp#.div(mean_celltype_sp.mean(axis=0),axis=1) # Get normalized mean expressions over cell types (this will be summed up over negative cell types) mean_ct_sc_norm = mean_celltype_sc#.div(mean_celltype_sc.sum(axis=0),axis=1) mean_ct_sp_norm = mean_celltype_sp#.div(mean_celltype_sp.sum(axis=0),axis=1) # Explanation: The idea is to measure which ratio of mean expressions is shifted towards negative clusters. # With this approach the metric stays between 0 and 1 commongenes=mean_ct_sc_rel.index # Get gene-cell type pairs with negative marker expression neg_marker_mask = np.array(mean_ct_sc_rel < minimum_exp) sns.clustermap(mean_ct_sc_rel.astype(float),vmax=0.1) # Return nan if no negative markers were found if np.sum(neg_marker_mask) < 1: print("No negative markers were found in the sc data reference.") negative_marker_purity = 'nan' if pipeline_output==True: return negative_marker_purity else: return negative_marker_purity, None, None # Get marker expressions in negative marker-cell type pairs lowvals_sc = np.full_like(neg_marker_mask, np.nan, dtype=np.float32) lowvals_sc = mean_ct_sc_norm.values[neg_marker_mask] lowvals_sp = mean_ct_sp_norm.values[neg_marker_mask] # Take the mean over the normalized expressions of the genes' negative cell types mean_sc_lowexp = np.nanmean(lowvals_sc) mean_sp_lowexp = np.nanmean(lowvals_sp) # Calculate summary metric #negative_marker_purity = 1 #if mean_sp_lowexp > mean_sc_lowexp: # negative_marker_purity -= (mean_sp_lowexp - mean_sc_lowexp) lowvals_diff=(lowvals_sp-lowvals_sc) lowvals_diff[lowvals_diff<0]=0 negative_marker_purity=1-np.mean(lowvals_diff) if pipeline_output: return negative_marker_purity else: # Calculate per gene and per cell type purities purities = (mean_ct_sp_norm - mean_ct_sc_norm)#.cip(0,None) purities[~neg_marker_mask] = np.nan purities = purities.loc[~(purities.isnull().all(axis=1)), ~(purities.isnull().all(axis=0))] purity_per_gene = purities.mean(axis=0, skipna=True) purity_per_celltype = purities.mean(axis=1, skipna=True) return negative_marker_purity, purity_per_gene, purity_per_celltype,lowvals_sc,lowvals_sp,commongenes
[docs]def coexpression_calculation(exp,min_exp=0): """ Caculate coexpression between genes in a given dataset Args: exp (DataFrame): expression of cells profiled in a cell x gene format, where cells are rows and genes are columns. min_exp (float): Maximum expression of the cells to be considered as not expressing a gene (typically is 0). results: coexpression(DataFrame): coexpression DataFrame represented as a gene-by-gene matrix. """ coexpression=pd.DataFrame(index=exp.columns,columns=exp.columns) for col in tqdm(exp.columns): sel=exp.loc[:,col]>min_exp positive_cells=exp.loc[sel,:] coexpression.loc[:,col]=np.sum(positive_cells>min_exp)/positive_cells.shape[0] coexpression=coexpression.fillna(1) return coexpression
[docs]def alphashape_fun(points,alpha=0.1): """ Caculate area of a a cell Args: points (list of tuple): list of xy points found in a cell (i.e. [(1,2),(2,4)]). alpha (int): alpha parameter to be tuned to define cell border. results: area(flaot): Area of the cell. """ alpha_shape = alphashape.alphashape(points, alpha) area = alpha_shape.area try: ax.add_patch(plt.PolygonPatch(alpha_shape, alpha=0.2, color='orange', label='Alpha Shape')) except: ss='ss' print("Area of the Alpha Shape (Concave Hull):", area) return area
[docs]def svf_moranI(adata1,sample_key='sample',radius=50.0): """ Compute spatially variable features using Moran's I (squidpy implementation) Args: adata1 (AnnData): AnnData object of profiled cells. sample_key (str): Column of adata1.obs where sample of origin of each cell is stored. radius (float): Radius usd to compute the spatial neighbors in sq.gr.spatial_neighbors. Given in the scale the spatial coordinates are in (typically in um). results: adata1 (AnnData): AnnData object of profiled cells with computed svf's. hs_results(DataFrame): DataFrame with the results of computing moran's I for each gene in the given input dataset, including pval, FDR and ranking of the gene. """ anndata_list=[] for sample in adata.obs[sample_key].unique(): adata_copy_int = adata[adata.obs[sample_key]==sample] sq.gr.spatial_neighbors(adata_copy_int,radius=radius,coord_type ='generic') anndata_list.append(adata_copy_int) adata1=sc.concat(anndata_list,join='outer',pairwise=True) sq.gr.spatial_autocorr(adata1, mode="moran") hs_results=adata1.uns["moranI"] hs_results['rank']=list(hs_results['I'].rank()) hs_results=hs_results.loc[:,['pval_norm','pval_norm_fdr_bh','rank']] hs_results.columns=['Pval','FDR','rank'] return adata1,hs_results