Source code for mdvtools.conversions

from scanpy import AnnData
from mudata import MuData
import scipy
import pandas as pd
from os.path import join, split
import os
from .mdvproject import MDVProject
import numpy as np
import json
import gzip
import copy

[docs] def convert_scanpy_to_mdv( folder: str, scanpy_object: AnnData, max_dims: int = 3, delete_existing: bool = False, label: str = "" ) -> MDVProject: """ Convert a Scanpy AnnData object to MDV (Multi-Dimensional Viewer) format. This function transforms single-cell RNA sequencing data from AnnData format into the MDV project structure, handling both cells and genes as separate datasources with their associated dimensionality reductions and metadata. Args: folder (str): Path to the target MDV project folder scanpy_object (AnnData): The AnnData object containing the single-cell data max_dims (int, optional): Maximum number of dimensions to include from dimensionality reductions. Defaults to 3. delete_existing (bool, optional): Whether to delete existing project data. If False, merges with existing data. Defaults to False. label (str, optional): Prefix to add to datasource names and metadata columns when merging with existing data. Defaults to "". Returns: MDVProject: The configured MDV project object with the converted data Notes: Data Structure Creation: - Creates two main datasources: '{label}cells' and '{label}genes' - Preserves all cell metadata from scanpy_object.obs - Preserves all gene metadata from scanpy_object.var - Transfers dimension reductions from obsm/varm matrices - Links cells and genes through expression data - Adds gene expression scores as a subgroup View Handling: - If delete_existing=True: * Creates new default view with empty initial charts * Sets project as editable - If delete_existing=False: * Preserves existing views * Updates views with new datasources * Maintains panel widths and other view settings * Adds new datasources to each view's initialCharts Dimension Reduction: - Processes dimensionality reductions up to max_dims - Supports standard formats (e.g., PCA, UMAP, t-SNE) - Column names in the format: {reduction_name}_{dimension_number} Raises: ValueError: If the provided AnnData object is invalid or missing required components IOError: If there are issues with file operations in the target folder Exception: For other unexpected errors during conversion """ mdv = MDVProject(folder, delete_existing=delete_existing) # If not deleting existing, preserve current views current_views = None if not delete_existing: current_views = mdv.views else: current_views = {} # create datasource 'cells' cell_table = scanpy_object.obs cell_table["cell_id"] = cell_table.index # add any dimension reduction _add_dims(cell_table, scanpy_object.obsm, max_dims) mdv.add_datasource(f"{label}cells", cell_table) # create datasource 'genes' gene_table = scanpy_object.var gene_table[f"{label}name"] = gene_table.index _add_dims(gene_table, scanpy_object.varm, max_dims) mdv.add_datasource(f"{label}genes", gene_table) # link the two datasets mdv.add_rows_as_columns_link(f"{label}cells", f"{label}genes", f"{label}name", "Gene Expr") # add the gene expression mdv.add_rows_as_columns_subgroup( f"{label}cells", f"{label}genes", "gs", scanpy_object.X, name="gene_scores", label="Gene Scores" ) if delete_existing: # If we're deleting existing, create new default view mdv.set_view("default", {"initialCharts": {"cells": [], "genes": []}}, True) mdv.set_editable(True) else: # If we're not deleting existing, update existing views with new datasources new_views = {} for view_name, view_data in current_views.items(): new_view_data = copy.deepcopy(view_data) # Initialize new charts if they don't exist if "initialCharts" not in new_view_data: new_view_data["initialCharts"] = {} # Add new datasources to initialCharts new_view_data["initialCharts"][f"{label}cells"] = [] new_view_data["initialCharts"][f"{label}genes"] = [] # Initialize dataSources if they don't exist if "dataSources" not in new_view_data: new_view_data["dataSources"] = {} # Add new datasources with panel widths new_view_data["dataSources"][f"{label}cells"] = {"panelWidth": 50} new_view_data["dataSources"][f"{label}genes"] = {"panelWidth": 50} new_views[view_name] = new_view_data return mdv
[docs] def convert_mudata_to_mdv(folder: str, mudata_object: MuData, max_dims=3) -> MDVProject: md = mudata_object p = MDVProject(folder) # add the cells _add_dims(md.obs, md.obsm, max_dims) p.add_datasource("cells", md.obs) for mod in md.mod.keys(): mdata = md.mod[mod] matrix = mdata.X.value if hasattr(mdata.X, "value") else mdata.X if matrix.shape[1] != 0: p.add_datasource(mod, mdata.var) p.set_column(mod, {"name": "name", "datatype": "unique"}, mdata.var.index) p.add_rows_as_columns_link("cells", mod, "name", mod) matrix = scipy.sparse.csr_matrix(matrix).transpose().tocsr() p.add_rows_as_columns_subgroup( "cells", mod, mod + "_expr", matrix, sparse=True ) return p
[docs] def convert_vcf_to_df(vcf_filename: str) -> pd.DataFrame: f = open(vcf_filename, "r") metadata = {} while True: line = f.readline() if line.startswith("##"): key, value = line[2:].strip().split("=", 1) if key in metadata: metadata[key].append(value) else: metadata[key] = [value] if line.startswith("#CHROM"): break # todo - do something with metadata # print(metadata) temp_file = "temp.csv" ## todo with tempfile with open(temp_file, "w") as tmp: while line: tmp.write(line) line = f.readline() f.close() df = pd.read_csv(temp_file, sep="\t") os.remove(temp_file) return df
[docs] def compute_vcf_end(df: pd.DataFrame) -> pd.DataFrame: """ Compute the end position of the variant determined from 'POS', 'REF' and 'ALT'. This is added as a column 'END' in the given DataFrame. """ def varlen(s) -> int: return max([len(v) for v in str(s).split(",")]) df["END"] = df["POS"] + df[["REF", "ALT"]].map(varlen).max(axis=1) return df
[docs] def convert_vcf_to_mdv(folder: str, vcf_filename: str) -> MDVProject: """ Converts a VCF file to an MDV project. The VCF file must be tab-delimited, with the header lines starting with "##" and column names in the line starting with "#CHROM". An 'END' column is derived, which is the end position of the variant determined from 'POS', 'REF' and 'ALT'. """ p = MDVProject(folder, delete_existing=True) df = convert_vcf_to_df(vcf_filename) # for single nucleotide variants, we still need an end position for the genome browser # maybe add_genome_browser should be able to understand only one position? # other forms VCF variants have a length, so we could use that... df = compute_vcf_end(df) # ^^ I should verify that this makes sense from biological perspective columns = [ {"name": "#CHROM", "datatype": "text"}, # chromosome {"name": "POS", "datatype": "integer"}, # start of the variant { "name": "END", "datatype": "integer", }, # not standard VCF, but useful for genome browser(? maybe temporary) { "name": "ID", "datatype": "unique", "delimiter": ";", }, # should be unique, but also semicolon-delimited - could be useful to preserve this {"name": "REF", "datatype": "multitext", "delimiter": ","}, # reference base(s) { "name": "ALT", "datatype": "multitext", "delimiter": ",", }, # comma-delimited list of alternate non-reference alleles { "name": "QUAL", "datatype": "double", }, # phred-scaled quality score for the assertion made in ALT { "name": "FILTER", "datatype": "multitext", "delimiter": ";", }, # PASS or semicolon-delimited list of filters that the variant has failed { "name": "INFO", "datatype": "multitext", "delimiter": ",", }, # comma-delimited list of additional information, no white space, semicolons or equals signs permitted # ^^^ note, the first random vcf file I found has all manner of = and ; in the INFO field, so I'm not enclined to parse this too rigidly # {'name': 'FORMAT', 'datatype': 'text'}, # not sure why copilot thought this should be here - not in the VCF spec ] name = os.path.basename(vcf_filename) p.add_datasource(name, df, columns=columns) p.add_genome_browser(name, ["#CHROM", "POS", "END"]) p.set_editable(True) return p
[docs] def create_regulamentary_project( folder, output: str, name: str, bigwig_folder="", bed_folder="", openchrome="DNase", openchrome_color="#eb9234", marks=["H3K4me1", "H3K4me3", "H3K27ac", "CTCF"], mark_colors=["#349beb", "#3aeb34", "#c4c41f", "#ab321a"], genome="hg38", ): # get the template dir tdir = join(split(os.path.abspath(__file__))[0], "templates") p = MDVProject(output) mdvfile = join(folder, "08_REgulamentary", "mlv_REgulamentary.csv") mdv = pd.read_csv(mdvfile, sep="\t") p.add_datasource("elements", mdv) # add the tracks all_names = [openchrome] + marks all_colors = [openchrome_color] + mark_colors if bigwig_folder.startswith("http"): bigwigs = [f"{bigwig_folder}/{name}_{x}_{genome}.bw" for x in all_names] beds = [f"{bed_folder}/{name}_{x}_{genome}.bb" for x in all_names] else: # trying to avoid unbound variables - but not convinced this is correct # what is supposed to happen if bigwig_folder is not a URL? # >> need tests for this function... << beds = bigwigs = ["" for _ in all_names] print( "bigwig_folder is not a URL - using empty URLs for tracks, may well be wrong." ) pass # get the reference default_tracks = [] for n in range(len(all_names)): default_tracks.append( { "url": bigwigs[n], "short_label": all_names[n] + " cov", "color": all_colors[n], "height": 60, "track_id": "coverage_" + all_names[n], } ) default_tracks.append( { "url": beds[n], "short_label": all_names[n] + " peaks", "color": all_colors[n], "height": 15, "featureHeight": 5, "track_id": "peaks_" + all_names[n], } ) extra_params = { "default_tracks": default_tracks, "default_parameters": { "view_margins": {"type": "fixed_length", "value": 4000}, "color_by": "RE", "color_legend": {"display": False, "pos": [5, 5]}, "feature_label": "RE", }, } p.add_genome_browser( "elements", ["chromosome", "start", "end"], extra_params=extra_params ) p.add_refseq_track("elements", genome) _create_dt_heatmap(folder, p, mdv, marks) view = json.load(open(join(tdir, "views", "regulamentary.json"))) gb = p.get_genome_browser("elements") gb.update( { "id": "browser", "size": [389, 550], "position": [1046, 8], "title": "elements", } ) view.append(gb) p.set_view( "default", { "initialCharts": { "elements": view, } }, ) return p
[docs] def _create_dt_heatmap(folder, project, mdv, marks): # get the order of the regions in the matrix mdv_i = mdv.set_index(["chromosome", "start", "end"]) order = pd.read_csv( join(folder, "04_sort_regions", "sort_union.bed"), sep="\t", header=None ) order = order.set_index([0, 1, 2]) order = order[order.index.isin(mdv_i.index)] mdv_i = mdv.reset_index() mdv_i["row_position"] = mdv_i.index arr = order.index.map( mdv_i.set_index(["chromosome", "start", "end"])["row_position"] ) arr = np.array(arr, dtype=np.int32) # add the binary data output = join(project.dir, "binarydata", "elements") os.makedirs(output, exist_ok=True) hm = gzip.open(join(output, "heat_map.gz"), "wb") hm.write(arr.tobytes()) # get the matrix and drop last two columns mt = pd.read_csv(join(folder, "09_metaplot", "matrix.csv"), sep="\t") mt = mt.iloc[:, :1600] # flatten and halve in size (mean of adjacent values) arr = mt.values.flatten() arr = arr.reshape(-1, 2) arr = np.mean(arr, axis=1) # normalize and clip to 255 (1 byte per value) max = np.percentile(arr, 99.99) arr = (arr / max) * 255 arr = np.clip(arr, 0, 255) arr = arr.astype(np.uint8) hm.write(arr.tobytes()) hm.close() # add the metdata md = project.get_datasource_metadata("elements") md["deeptools"] = { "maps": { "default": { "data": "heat_map", "rows": md["size"], "cols": 800, "groups": marks, "max_scale": max, } } } md["binary_data_loader"] = True project.set_datasource_metadata(md)
[docs] def _add_dims(table, dims, max_dims): if len(dims.keys()) == 0: return for dname in dims: if len(dims[dname].shape) == 1: continue dm = dims[dname].transpose() md = min(max_dims, dm.shape[0]) for n in range(md): table[f"{dname}_{n+1}"] = dm[n]