import numpy as np
import pandas as pd
import scanpy as sc
import seaborn as sns
import matplotlib.pyplot as plt
import gzip
import shutil
import os.path
from scipy.io import mmread
import tifffile as tf
from scipy.spatial import ConvexHull, convex_hull_plot_2d
from anndata import AnnData
import json
from sklearn.metrics import mutual_info_score
from sklearn.metrics import fowlkes_mallows_score
import warnings
warnings.filterwarnings("ignore")
import random
import scipy.sparse as sparse
from scipy.sparse import csr_matrix, issparse
#from Banksy_py.banksy.initialize_banksy import initialize_banksy
#from Banksy_py.banksy.run_banksy import run_banksy_multiparam
import squidpy as sq
from random import randint
from tqdm import tqdm
import sklearn.metrics as sk
## modify the X coords of each sample so that they are processed independently
[docs]def adapt_banksy_for_multisample(adata,samplekey='sample'):
""" Modify the spatial coordinates of each sample in adata so that they can be later be processed together by banksy
Args:
adata (AnnData): AnnData object with the cells of the experiment.
samplekey(str): name of the column in adata.obs where the sample of origin of each cell is stored.
results:
adata (AnnData): AnnData object with the cells of the experiment with modified adata.obs['spatial'], ready to perform banksy.
"""
adata.obs['Xres']=adata.obs['x_centroid']
adata.obs['Yres']=adata.obs['y_centroid']
gap=adata.obs['x_centroid'].max()+int(adata.obs['x_centroid'].max()/10)
samplekey='sample'
counter=0
sampsel=[]
for s in adata.obs[samplekey].unique():
sampsel.append(s)
adata.obs.loc[~adata.obs[samplekey].isin(sampsel),'Xres']=adata.obs.loc[~adata.obs[samplekey].isin(sampsel),'Xres']+gap
counter=counter+1
adata.obs['Yres']=adata.obs['Yres']+1
adata.obsm['spatial_sample_specific']=adata.obsm['spatial']
adata.obsm['spatial']=np.array(adata.obs.loc[:,['Xres','Yres']])
return adata
[docs]def define_palette(n_colors=50):
""" Create a random palette of colors in hex format
Args:
n_colors(str): number of colors to be inclued in the palette.
results:
colorlist(list): list of generated colors in hex format.
"""
colorlist = []
n = n_colors
for i in range(n):
colorlist.append('#%06X' % randint(0, 0xFFFFFF))
return colorlist
[docs]def domains_by_banksy(adata,plot_path:str,banksy_params:dict,save=True):
""" Modify the spatial coordinates of each sample in adata so that they can be later be processed together by banksy
Args:
adata (AnnData): AnnData object with the cells of the experiment where Banksy will be computed.
save(boolean): whether to save the resulting object or not.
plot_path(str): path where to save the plots generated, if desired.
banksy_params(dict): parameters required to perform banksy.
results:
adata (AnnData): Original AnnData object with the cells of the experiment with domains identified assigned to cells.
adata_res (AnnData): AnnData object resulting of the identification of domains. It contains all intermediate information generated by Banksy.
"""
adata=adapt_banksy_for_multisample(adata)
coord_keys = ('Xres', 'Yres','spatial')
prev_clusters=[e for e in adata.obs.columns if 'leiden' in e]
if len(prev_clusters)>0:
annotation_key=prev_clusters[0]
else:
adata.obs['default_clustering']='0'
adata.obs['default_clustering']=adata.obs['default_clustering'].astype('category')
annotation_key='default_clustering'
banksy_dict = initialize_banksy(
adata,
coord_keys,
banksy_params['k_geom'],
nbr_weight_decay=banksy_params['nbr_weight_decay'],
max_m=banksy_params['max_m'],
plt_edge_hist=True,
plt_nbr_weights=True,
plt_agf_angles=False,
plt_theta=False)
results_df = run_banksy_multiparam(
adata,
banksy_dict,
banksy_params['lambda_list'], banksy_params['resolutions'],
color_list = define_palette(n_colors=100), max_m = banksy_params['max_m'], filepath = plot_path,
key = coord_keys, pca_dims = banksy_params['pca_dims'],
annotation_key = annotation_key, max_labels = None,
cluster_algorithm = banksy_params['cluster_algorithm'], match_labels = False, savefig = save, add_nonspatial = False,
variance_balance = False,
)
adata_res=results_df.loc[results_df.index[0],'adata']
res=adata_res.obs.loc[:,['unique_cell_id',results_df.index[0]]]
res.columns=['unique_cell_id','spatial_domain']
adata_res=results_df.loc[results_df.index[0],'adata']
res=adata_res.obs.loc[:,['unique_cell_id',results_df.index[0]]]
res.columns=['unique_cell_id','spatial_domain']
id2domain=dict(zip(res['unique_cell_id'],res['spatial_domain']))
adata.obs['banksy_domain']=adata.obs['unique_cell_id'].map(id2domain).astype(str)
adata.obsm['spatial']=adata.obsm['spatial_sample_specific']
return adata,adata_res
[docs]def domains_by_rbd(adata,hyperparameters_rbd:dict): #read-based domains
""" Define cellular domains by collapsing the expression of cells arround each cell (a.k.a pseudobining) and clustering
Args:
adata (AnnData): AnnData object with the cells of the experiment.
hyperparameters_rbd(dict): dictionary with all the parameters required to identify domains based on reads (read based domains).
results:
adata (AnnData): Original AnnData object with expression of cells and the domain identified incorporated in a column in adata.obs.
adataneigh (AnnData): AnnData object where domains have been identified. Cells here include the expression of neighboring cells collapsed into them.
"""
sample_key='sample' # sample is considered the "sample_key"
anndata_list=[]
for sample in adata.obs[sample_key].unique():
adata_copy_int = adata[adata.obs[sample_key]==sample]
adataneigh2=format_data_neighs_colapse(adata,sample_key,neighs=hyperparameters_rbd['neighbors'])
anndata_list.append(adataneigh2)
adataneigh=sc.concat(anndata_list)
adataneigh.X=np.nan_to_num(adataneigh.X)
adataneigh=adataneigh[adataneigh.obs['total_counts']>3]
adataneigh.raw=adataneigh
sc.pp.neighbors(adataneigh, n_neighbors=20,n_pcs=0)
sc.tl.umap(adataneigh,min_dist=0.1)
if hyperparameters_rbd['clustering_algorithm']=='leiden':
sc.tl.leiden(adataneigh,resolution=hyperparameters_rbd['resolution'],key_added='rbd_domain')
if hyperparameters_rbd['clustering_algorithm']=='louvain':
sc.tl.louvain(adataneigh,resolution=hyperparameters_rbd['resolution'],key_added='rbd_domain')
if hyperparameters_rbd['clustering_algorithm']=='leiden':
sc.tl.leiden(adataneigh,resolution=hyperparameters_rbd['resolution'],key_added='rbd_domain')
if hyperparameters_rbd['clustering_algorithm']=='louvain':
sc.tl.louvain(adataneigh,resolution=hyperparameters_rbd['resolution'],key_added='rbd_domain')
id2domain=dict(zip(adataneigh.obs['unique_cell_id'],adataneigh.obs['rbd_domain']))
adata.obs['rbd_domain']=adata.obs['unique_cell_id'].map(id2domain).astype(str)
return adata,adataneigh
[docs]def compare_domains(adata,domain_keys:list,save=True,plot_path='./'):
""" Compare domains assigned by different methods using ARI. Generate heatmap comparing them
Args:
adata (AnnData): AnnData object with the cells of the experiment.
domain_keys(list): list of the column names in adata.obs where domains are stored.
save(boolean): whether to save plots on not.
plot_path(str): path to the folder where to save the resulting plots.
results:
ARI (DataFrame): DataFrame consisting of ARI computed between domain idenification methods.
"""
ARI=pd.DataFrame(index=domain_keys,columns=domain_keys)
for ind in ARI.index:
for col in ARI.columns:
ARI.loc[ind,col]=sk.adjusted_rand_score(adata.obs[ind],adata.obs[col])
ARI.index=[i.replace('_domain','') for i in ARI.index]
ARI.columns=[i.replace('_domain','') for i in ARI.columns]
plt.figure()
sns.clustermap(ARI.astype(float),cmap='Greens',figsize=(len(ARI.columns)*1.5,len(ARI.columns)*1.5),dendrogram_ratio=0.15)
if save==True:
plt.savefig(plot_path+'clustermap_ARI_domains.pdf')
for s in adata.obs['sample'].unique():
fig,ax=plt.subplots(nrows=len(domain_keys))
adatasub=adata[adata.obs['sample']==s]
s=0
for groupby in domain_keys:
sc.pl.spatial(adatasub,color=groupby,spot_size=40,ax=ax[s],show=False)
s=s+1
if save==True:
plt.savefig(plot_path+'map_all_domains_'+str(s)+'.pdf')
return ARI
[docs]def domains_by_nbd(adata,hyperparameters_nbd:dict):
""" Define cellular domains by collapsing using the cellular identity of neighboring cell types and clustering
Args:
adata (AnnData): AnnData object with the cells of the experiment.
hyperparameters_nbd(dict): dictionary with all the parameters required to identify domains based on neighbors (neighbors based domains).
results:
adata (AnnData): Original AnnData object with expression of cells and the domain identified incorporated in a column in adata.obs.
adataneigh (AnnData): AnnData object where domains have been identified. Cells here include the identity of neighboring cells.
"""
sample_key='sample' # sample is considered the "sample_key"
anndata_list=[]
for sample in adata.obs[sample_key].unique():
adata_copy_int = adata[adata.obs[sample_key]==sample]
adataneigh2=format_data_neighs(adata,hyperparameters_nbd['key'],neighs=hyperparameters_nbd['neighbors'])
anndata_list.append(adataneigh2)
adataneigh=sc.concat(anndata_list)
adataneigh.X=np.nan_to_num(adataneigh.X)
adataneigh=adataneigh[adataneigh.obs['total_counts']>3]
adataneigh.raw=adataneigh
sc.pp.neighbors(adataneigh, n_neighbors=20,n_pcs=0)
sc.tl.umap(adataneigh,min_dist=0.1)
if hyperparameters_nbd['clustering_algorithm']=='leiden':
sc.tl.leiden(adataneigh,resolution=hyperparameters_nbd['resolution'],key_added='nbd_domain')
if hyperparameters_nbd['clustering_algorithm']=='louvain':
sc.tl.louvain(adataneigh,resolution=hyperparameters_nbd['resolution'],key_added='nbd_domain')
if hyperparameters_nbd['clustering_algorithm']=='leiden':
sc.tl.leiden(adataneigh,resolution=hyperparameters_nbd['resolution'],key_added='nbd_domain')
if hyperparameters_nbd['clustering_algorithm']=='louvain':
sc.tl.louvain(adataneigh,resolution=hyperparameters_nbd['resolution'],key_added='nbd_domain')
id2domain=dict(zip(adataneigh.obs['unique_cell_id'],adataneigh.obs['nbd_domain']))
adata.obs['nbd_domain']=adata.obs['unique_cell_id'].map(id2domain).astype(str)
return adata,adataneigh
[docs]def spatial_plot(adata,groupby='nbd_domain',save=False,plot_path='./'):
""" Generate spatial plot of each sample in an AnnData object, with cells color as required
Args:
adata (AnnData): AnnData object with the cells of the experiment.
groupby(str): name of the column in adata.obs to use to color cells.
save(boolean):whether to save the resulting plots or not.
plot_path(str): if required, path where to save the resulting plots.
results:
None.
"""
for s in adata.obs['sample'].unique():
adatasub=adata[adata.obs['sample']==s]
sc.pl.spatial(adatasub,color=groupby,spot_size=40,vmax='p99',show=False)
if save==True:
plt.savefig(plot_path+'spatial_map_'+str(s)+'_by_'+str(groupby)+'.pdf')