Source code for mdvtools.csv_to_static

import re
import json
import pandas as pd
import argparse
import os
import gzip
import shutil
# import numpy as np

[docs] parser = argparse.ArgumentParser( description="Process a csv table into MDV static site format" )
parser.add_argument("-i", "--input", help="csv file to process", default="data.csv") parser.add_argument("-o", "--outdir", help="output folder") parser.add_argument( "--discard_redundant", help="discard redundant columns", default=False ) # grouping options? rename columns so numbers have leading zeros? parser.add_argument("-g", "--group", help="group columns by regex", default=False) parser.add_argument( "--group_by", help="group columns by regex", default=r"(.*?)(\d+)(.*)" ) parser.add_argument("-s", "--separator", help="multitext separator", default=";")
[docs] parse_multitext = True
[docs] args = parser.parse_args()
[docs] separator = args.separator
filename = args.input while not os.path.exists(filename): input(f'file "{filename}" does not exist. Press enter to try again.')
[docs] filename = input("CSV input file: ")
[docs] basename = os.path.basename(filename)
outdir = args.outdir if not outdir:
[docs] outdir = input("output folder: ")
os.umask(0) if not os.path.exists(outdir): os.makedirs(outdir) ### todo: non-hacky image handling
[docs] indir = os.path.dirname(filename)
[docs] has_images = os.path.exists(os.path.join(indir, "images"))
if has_images and not os.path.exists(os.path.join(outdir, "images")): try: shutil.copytree(os.path.join(indir, "images"), os.path.join(outdir, "images")) except Exception: pass print("reading csv...")
[docs] df = pd.read_csv(filename)
[docs] types = { "float64": "double", "int64": "integer", "O": "text", "object": "text", "bool": "boolean", # 0 or 1? currently breaks this script if I make this integer }
# if we were processing multiple sources, we should review global df / col_types...
[docs] col_types = {}
[docs] def rename_columns(): return # rename columns that are numbers to have leading zeros # this is so they sort correctly in the UI for name in df.columns: m = re.search(args.groub_by, name) if not m: continue new_name = f"{m.group(1)}_{m.group(2).zfill(3)}{m.group(3)}" df.rename(columns={name: new_name}, inplace=True)
[docs] def get_column_type(name): # get_column_type is called from get_datasource() then convert_data_to_binary() # second call was getting wrong type, so remembering the values should help. if name in col_types: return col_types[name] v = df[name] unique_values = set(v) dtype = str(v.dtype) ttype = types[dtype] # if dtype == 'text' and len(unique_values) == v.size: # print(f'unique text column "{name}"') # ttype = 'unique' if ttype == "text" and parse_multitext: # does it look like comma-separated tags? # 'argument of type 'bool' is not iterable'??? # when we have something like "unique_values: {False, True, nan}" # print(f'{name}: ({type}) unique_values: {unique_values}') n = len(unique_values) if n > 65536: print(f'detected unique column "{name}" (not well tested with this script)') ttype = "unique" elif n > 256 or any([separator in str(s) for s in unique_values]): print(f'detected multitext column "{name}"') ttype = "multitext" if ttype is None: raise ValueError(f"unknown type {v.dtype} for {name}") col_types[name] = ttype return ttype
[docs] def get_quantiles(col): qs = {} for q in ["0.001", "0.01", "0.05"]: q1 = col.quantile(float(q)) q2 = col.quantile(1 - float(q)) qs[q] = [q1, q2] return qs
[docs] def get_text_indices(col): values = list(set(col)) val_dict = {value: i for i, value in enumerate(values)} return [val_dict[v] for v in col], [str(s) for s in values]
[docs] def get_column_groups(): col_groups = {} for name in df.columns: m = re.search(args.group_by, name) if not m: continue group_name = f"{m.group(1)}_{m.group(3)}" if group_name not in col_groups: col_groups[group_name] = {"name": group_name, "columns": []} # num = int(m.group(2)) col_groups[group_name]["columns"].append(name) return [col_groups[k] for k in col_groups]
[docs] def get_datasource(): """ Has some side effects on the dataframe: if args.discard_redundant: - removes columns that are redundant (all the same value) text columns are converted to indices. Outputs a descriptor like this: { "name": "metric_table", "size": number of rows, "images": { "images": { "base_url": "./images/", "type": "png", "key_column": "image_id" } } "columns": [ { "datatype": "float" | "integer" | "text" | "unique", "name": "column_name", "field": "column_name", "minMax"?: [min, max], "quantiles"?: ..., "values"?: ['a', 'b', 'c'], } ] } """ descriptor = {"name": basename, "size": df.shape[0], "columns": []} if has_images: # todo: make this able to take some config, set proper type / key_column etc. # ideally, find a column that has values corresponding to the names of images in the folder... descriptor["images"] = { "images": {"base_url": "./images/", "type": "png", "key_column": "Index"} } for name in df.columns: col = df[name] if args.discard_redundant and len(set(col)) == 1: df.drop(name, axis=1, inplace=True) continue datatype = get_column_type(name) col_desc = {"datatype": datatype, "name": name, "field": name} if datatype == "boolean": # would be better to have a separate boolean type print(f"converting boolean {name} to number") col_desc["datatype"] = "integer" col_desc["minMax"] = [0, 1] elif datatype == "double" or datatype == "integer": col_desc["minMax"] = [min(col), max(col)] col_desc["quantiles"] = get_quantiles(col) elif datatype == "text" or datatype == "multitext": # would be better to have a separate boolean type # col_desc['datatype'] = 'text' indices, values = get_text_indices(col) col_desc["values"] = values if datatype == "multitext": col_desc["separator"] = separator # mutating df here... df[name] = indices elif datatype == "unique": col_desc["stringLength"] = max([len(v) for v in col]) descriptor["columns"].append(col_desc) descriptor["columnGroups"] = get_column_groups() return descriptor
[docs] def replace_text_values(col, values): val_dict = {value: i for i, value in enumerate(values)} return [val_dict[v] for v in col]
[docs] def get_views(): return {basename: {"name": basename, "initialCharts": {basename: []}}}
[docs] def get_state(): return {"all_views": [basename], "initial_view": basename}
[docs] def convert_data_to_binary(df): """ Converts the dataframe to binary format. """ dfile = f"{outdir}/{basename}.gz" o = open(dfile, "wb") index = {} current_pos = 0 for name in df.columns: # 'integer' and 'double' should be converted to float32 according to the spec type = get_column_type(name) if type == "integer" or type == "double" or type == "boolean": print(f"converting {name} {type} to float32") df[name] = df[name].astype("float32") if type == "text": print(f"converting {name} {type} to uint8") df[name] = df[name].astype("uint8") if type == "multitext": print(f"converting {name} {type} to uint16") df[name] = df[name].astype("uint16") comp = gzip.compress(df[name].to_numpy().tobytes()) new_pos = current_pos + len(comp) index[name] = [current_pos, new_pos - 1] o.write(comp) current_pos = new_pos o.close() ifile = dfile[: dfile.rindex(".")] + ".json" with open(ifile, "w") as f: f.write(json.dumps(index))
[docs] def main(): rename_columns() if not os.path.exists(outdir): print("creating output directory") os.makedirs(outdir) ds = get_datasource() with open(f"{outdir}/datasources.json", "w") as f: print("writing datasources.json") f.write(json.dumps([ds])) print("writing data binary") convert_data_to_binary(df) with open(f"{outdir}/views.json", "w") as f: f.write(json.dumps(get_views())) with open(f"{outdir}/state.json", "w") as f: f.write(json.dumps(get_state()))
if __name__ == "__main__": main()