For Databricks

Export table, column, and view metadata as parquet

Run the following script in a Notebook:

import concurrent.futures
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql.functions import col, lit, array_sort

# ===============================================================
# 1. Load ALL tables and ALL columns from information_schema
# ===============================================================

tables_df = spark.sql("""
    SELECT 
        table_catalog,
        table_schema,
        table_name,
        table_owner,
        comment,
        created,
        last_altered
    FROM system.information_schema.tables
    WHERE table_type <> 'VIEW'
""").cache()

views_df = spark.sql("""
    SELECT
        table_catalog,
        table_schema,
        table_name,
        view_definition
    FROM system.information_schema.views
""").cache()

columns_df = spark.sql("""
    SELECT 
        table_catalog,
        table_schema,
        table_name,
        column_name,
        ordinal_position,
        is_nullable,
        data_type
    FROM system.information_schema.columns
""")

# ===============================================================
# 2. Helper to extract partition & sort columns via DESCRIBE DETAIL
# ===============================================================

def fetch_detail_table_item(t):
    fq = f"{t.table_catalog}.{t.table_schema}.{t.table_name}"
    try:
        detail = spark.sql(f"DESCRIBE DETAIL {fq}").first().asDict()

        # Extract Delta metadata when present:
        partition_cols = detail.get("partitionColumns", [])
        sort_cols = (
            detail.get("sortColumns", []) or 
            detail.get("clusteringColumns", [])
        )

        return Row(
            table_catalog=t.table_catalog,
            table_schema=t.table_schema,
            table_name=t.table_name,
            table_owner=t.table_owner,
            row_count=detail.get("numRows"),            # May be null for non-delta
            bytes=detail.get("sizeInBytes"),
            comment=t.comment,
            created=int(t.created.timestamp() * 1000) if t.created else None,
            last_altered=int(t.last_altered.timestamp() * 1000) if t.last_altered else None,
            partition_cols=partition_cols,
            sort_cols=sort_cols,
            error=None
        )

    except Exception as e:
        return Row(
            table_catalog=t.table_catalog,
            table_schema=t.table_schema,
            table_name=t.table_name,
            table_owner=t.table_owner,
            row_count=None,
            bytes=None,
            comment=t.comment,
            created=int(t.created.timestamp() * 1000) if t.created else None,
            last_altered=int(t.last_altered.timestamp() * 1000) if t.last_altered else None,
            partition_cols=[],
            sort_cols=[],
            error=str(e)
        )


# ===============================================================
# 3. Run DESCRIBE DETAIL on driver threads
# ===============================================================

tables = tables_df.collect()
MAX_THREADS = 16

with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_THREADS) as executor:
    table_rows = list(executor.map(fetch_detail_table_item, tables))

# Build DataFrame
table_schema = StructType([
    StructField("table_catalog", StringType(), True),
    StructField("table_schema", StringType(), True),
    StructField("table_name", StringType(), True),
    StructField("table_owner", StringType(), True),
    StructField("row_count", LongType(), True),
    StructField("bytes", LongType(), True),
    StructField("comment", StringType(), True),
    StructField("created", LongType(), True),
    StructField("last_altered", LongType(), True),
    StructField("partition_cols", ArrayType(StringType()), True),
    StructField("sort_cols", ArrayType(StringType()), True),
    StructField("error", StringType(), True)
])

table_items_df = spark.createDataFrame(table_rows, schema=table_schema)

# ===============================================================
# 4. Write TableItem parquet output
# ===============================================================

table_items_df.coalesce(1).write.mode("overwrite").parquet("dbfs:/tmp/tables_export/")
print("Wrote table metadata to dbfs:/tmp/tables_export/")


# ===============================================================
# 5. Write schema (columns) parquet output
# ===============================================================

schema_df = columns_df.select(
    "table_catalog",
    "table_schema",
    "table_name",
    "column_name",
    col("ordinal_position").cast("int"),
    "is_nullable",
    "data_type"
)

schema_df.coalesce(1).write.mode("overwrite").parquet("dbfs:/tmp/columns_export/")
print("Wrote schema metadata to dbfs:/tmp/columns_export/")


# ===============================================================
# 6. Write ViewItem parquet output (NEW)
# ===============================================================

# Match your protobuf: (catalog, schema, name, definition)
views_export_df = views_df.select(
    "table_catalog",
    "table_schema",
    "table_name",
    "view_definition"
)

views_export_df.coalesce(1).write.mode("overwrite").parquet("dbfs:/tmp/views_export/")
print("Wrote view metadata to dbfs:/tmp/views_export/")

Copy to single origin s3

from datetime import datetime

# ===============================================================
# Configuration
# ===============================================================

bucket = "so-companyname-data"        # <-- change me
prefix = "databricks-trial"

# Source directories in DBFS
sources = {
    "views":   "dbfs:/tmp/views_export/",
    "tables":  "dbfs:/tmp/tables_export/",
    "columns": "dbfs:/tmp/columns_export/"
}

# ===============================================================
# Compute today's date partition
# ===============================================================

today = datetime.utcnow()
YYYY = today.strftime("%Y")
MM   = today.strftime("%m")
DD   = today.strftime("%d")

date_prefix = f"{YYYY}/{MM}/{DD}"

# ===============================================================
# Helper: write single-file Parquet and upload to S3
# ===============================================================

def export_single_parquet(df, name):
    data_dir = f"dbfs:/tmp/{name}_export/"

    # 2. Locate the one parquet file
    files = dbutils.fs.ls(data_dir)
    parquet_file = [f.path for f in files if f.path.endswith(".parquet")][0]

    # 3. Destination S3 path
    dest = f"s3://{bucket}/{prefix}/{date_prefix}/{name}.parquet"

    # 4. Copy to S3
    dbutils.fs.cp(parquet_file, dest)

    print(f"Exported {name} → {dest}")

# ===============================================================
# Load each DF and export
# ===============================================================

# 1. Views
views_df = spark.read.parquet(sources["views"])
export_single_parquet(views_df, "views")

# 2. Tables
tables_df = spark.read.parquet(sources["tables"])
export_single_parquet(tables_df, "tables")

# 3. Columns
columns_df = spark.read.parquet(sources["columns"])
export_single_parquet(columns_df, "columns")