For Databricks
Export table, column, and view metadata as parquet
Run the following script in a Notebook:
import concurrent.futures
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql.functions import col, lit, array_sort
# ===============================================================
# 1. Load ALL tables and ALL columns from information_schema
# ===============================================================
tables_df = spark.sql("""
SELECT
table_catalog,
table_schema,
table_name,
table_owner,
comment,
created,
last_altered
FROM system.information_schema.tables
WHERE table_type <> 'VIEW'
""").cache()
views_df = spark.sql("""
SELECT
table_catalog,
table_schema,
table_name,
view_definition
FROM system.information_schema.views
""").cache()
columns_df = spark.sql("""
SELECT
table_catalog,
table_schema,
table_name,
column_name,
ordinal_position,
is_nullable,
data_type
FROM system.information_schema.columns
""")
# ===============================================================
# 2. Helper to extract partition & sort columns via DESCRIBE DETAIL
# ===============================================================
def fetch_detail_table_item(t):
fq = f"{t.table_catalog}.{t.table_schema}.{t.table_name}"
try:
detail = spark.sql(f"DESCRIBE DETAIL {fq}").first().asDict()
# Extract Delta metadata when present:
partition_cols = detail.get("partitionColumns", [])
sort_cols = (
detail.get("sortColumns", []) or
detail.get("clusteringColumns", [])
)
return Row(
table_catalog=t.table_catalog,
table_schema=t.table_schema,
table_name=t.table_name,
table_owner=t.table_owner,
row_count=detail.get("numRows"), # May be null for non-delta
bytes=detail.get("sizeInBytes"),
comment=t.comment,
created=int(t.created.timestamp() * 1000) if t.created else None,
last_altered=int(t.last_altered.timestamp() * 1000) if t.last_altered else None,
partition_cols=partition_cols,
sort_cols=sort_cols,
error=None
)
except Exception as e:
return Row(
table_catalog=t.table_catalog,
table_schema=t.table_schema,
table_name=t.table_name,
table_owner=t.table_owner,
row_count=None,
bytes=None,
comment=t.comment,
created=int(t.created.timestamp() * 1000) if t.created else None,
last_altered=int(t.last_altered.timestamp() * 1000) if t.last_altered else None,
partition_cols=[],
sort_cols=[],
error=str(e)
)
# ===============================================================
# 3. Run DESCRIBE DETAIL on driver threads
# ===============================================================
tables = tables_df.collect()
MAX_THREADS = 16
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_THREADS) as executor:
table_rows = list(executor.map(fetch_detail_table_item, tables))
# Build DataFrame
table_schema = StructType([
StructField("table_catalog", StringType(), True),
StructField("table_schema", StringType(), True),
StructField("table_name", StringType(), True),
StructField("table_owner", StringType(), True),
StructField("row_count", LongType(), True),
StructField("bytes", LongType(), True),
StructField("comment", StringType(), True),
StructField("created", LongType(), True),
StructField("last_altered", LongType(), True),
StructField("partition_cols", ArrayType(StringType()), True),
StructField("sort_cols", ArrayType(StringType()), True),
StructField("error", StringType(), True)
])
table_items_df = spark.createDataFrame(table_rows, schema=table_schema)
# ===============================================================
# 4. Write TableItem parquet output
# ===============================================================
table_items_df.coalesce(1).write.mode("overwrite").parquet("dbfs:/tmp/tables_export/")
print("Wrote table metadata to dbfs:/tmp/tables_export/")
# ===============================================================
# 5. Write schema (columns) parquet output
# ===============================================================
schema_df = columns_df.select(
"table_catalog",
"table_schema",
"table_name",
"column_name",
col("ordinal_position").cast("int"),
"is_nullable",
"data_type"
)
schema_df.coalesce(1).write.mode("overwrite").parquet("dbfs:/tmp/columns_export/")
print("Wrote schema metadata to dbfs:/tmp/columns_export/")
# ===============================================================
# 6. Write ViewItem parquet output (NEW)
# ===============================================================
# Match your protobuf: (catalog, schema, name, definition)
views_export_df = views_df.select(
"table_catalog",
"table_schema",
"table_name",
"view_definition"
)
views_export_df.coalesce(1).write.mode("overwrite").parquet("dbfs:/tmp/views_export/")
print("Wrote view metadata to dbfs:/tmp/views_export/")
Copy to single origin s3
from datetime import datetime
# ===============================================================
# Configuration
# ===============================================================
bucket = "so-companyname-data" # <-- change me
prefix = "databricks-trial"
# Source directories in DBFS
sources = {
"views": "dbfs:/tmp/views_export/",
"tables": "dbfs:/tmp/tables_export/",
"columns": "dbfs:/tmp/columns_export/"
}
# ===============================================================
# Compute today's date partition
# ===============================================================
today = datetime.utcnow()
YYYY = today.strftime("%Y")
MM = today.strftime("%m")
DD = today.strftime("%d")
date_prefix = f"{YYYY}/{MM}/{DD}"
# ===============================================================
# Helper: write single-file Parquet and upload to S3
# ===============================================================
def export_single_parquet(df, name):
data_dir = f"dbfs:/tmp/{name}_export/"
# 2. Locate the one parquet file
files = dbutils.fs.ls(data_dir)
parquet_file = [f.path for f in files if f.path.endswith(".parquet")][0]
# 3. Destination S3 path
dest = f"s3://{bucket}/{prefix}/{date_prefix}/{name}.parquet"
# 4. Copy to S3
dbutils.fs.cp(parquet_file, dest)
print(f"Exported {name} → {dest}")
# ===============================================================
# Load each DF and export
# ===============================================================
# 1. Views
views_df = spark.read.parquet(sources["views"])
export_single_parquet(views_df, "views")
# 2. Tables
tables_df = spark.read.parquet(sources["tables"])
export_single_parquet(tables_df, "tables")
# 3. Columns
columns_df = spark.read.parquet(sources["columns"])
export_single_parquet(columns_df, "columns")Updated about 24 hours ago