For GCP BigQuery
This guide details the process for creating and exporting three Parquet files (columns.parquet, views.parquet, and query-historys.parquet) from BigQuery, and converting query-historys.parquet due to timestamp format mismatches.
Create and Upload Three Parquet Files
Run Query
- Go to the BigQuery console.
- Click on Compose new query.
- Enter the follow SQL query to create the columns table:
- Click Run to execute the query.
- After execution, click Save Results and select Save as Table. Name the table columns and specify your dataset.
SELECT
table_catalog AS table_catalog,
table_schema AS table_schema,
table_name AS table_name,
column_name AS column_name,
ordinal_position AS ordinal_position,
is_nullable AS is_nullable,
data_type AS data_type
FROM
`region-us.INFORMATION_SCHEMA.COLUMNS` --replace to your region
Corresponding parquet schema
message schema {
string table_catalog = 1;
string table_schema = 2;
string table_name = 3;
string column_name = 4;
int32 ordinal_position = 5;
string is_nullable = 6;
string data_type = 7;
}
Export Columns Table To Parquet
- In the BigQuery console, navigate to your dataset.Find the columns table, click the SAVE RESULTS TO select Save to BigQuery Table
- Choose Export to Google Cloud Storage.
- Set the file format to PARQUET and the compression to SNAPPY.
- Specify the destination path in Google Cloud Storage (e.g., gs://your_bucket/columns.parquet).
- Click Export to start the export process.
Generate And Export views.parquet
Go to the BigQuery console.
run Query
- Go to the BigQuery console.
- Click on Compose new query.
- Enter the follow SQL query to create the columns table:
- Click Run to execute the query.
- After execution, click Save Results and select Save as Table. Name the table columns and specify your dataset.
SELECT
table_catalog AS table_catalog,
table_schema AS table_schema,
table_name AS table_name,
view_definition AS view_definition
FROM
`region-us.INFORMATION_SCHEMA.VIEWS`
Corresponding parquet schema
message schema {
string table_catalog = 1;
string table_schema = 2;
string table_name = 3;
string view_definition = 4;
}
Click Run to execute the query.
After execution, click Save Results and select Save as Table. Name the table views and specify your dataset.
Export columns Table to Parquet
- In the BigQuery console, navigate to your dataset.Find the columns table, click the SAVE RESULTS TO select Save to BigQuery Table
- Choose Export to Google Cloud Storage.
- Set the file format to PARQUET and the compression to SNAPPY.
- Specify the destination path in Google Cloud Storage (e.g., gs://your_bucket/columns.parquet).
- Click Export to start the export process.
Generate And Export query-historys.parquet
Run Query
- Go to the BigQuery console.
- Click on Compose new query.
- Enter the follow SQL query to create the columns table:
- Click Run to execute the query.
- After execution, click Save Results and select Save as Table. Name the table columns and specify your dataset.
SELECT
job_id AS query_id,
query AS query,
user_email AS query_user,
user_email AS email,
TIMESTAMP_DIFF(end_time, creation_time, MICROSECOND) / 1e6 AS total_elapsed_time,
STRUCT(
UNIX_SECONDS(end_time) AS seconds,
CAST(EXTRACT(NANOSECOND FROM end_time) AS INT) AS nanos
) AS updated_at,
STRUCT(
UNIX_SECONDS(start_time) AS seconds,
CAST(EXTRACT(NANOSECOND FROM start_time) AS INT) AS nanos
) AS start_time,
STRUCT(
UNIX_SECONDS(end_time) AS seconds,
CAST(EXTRACT(NANOSECOND FROM end_time) AS INT) AS nanos
) AS end_time,
total_bytes_processed AS processed_bytes,
ARRAY(SELECT table_id FROM UNNEST(referenced_tables)) AS source_tables,
(SELECT table_id FROM UNNEST(referenced_tables) LIMIT 1) AS destination_table,
'' AS default_database,
'' AS default_schema,
statement_type AS statement_type,
ARRAY<FLOAT64>[] AS allocated_cost,
NULL AS produced_rows,
TIMESTAMP_DIFF(end_time, start_time, MILLISECOND) AS execution_time,
'' AS warehouse_size,
'' AS warehouse_id,
'' AS warehouse_name,
state AS execution_status,
'' AS query_tag
FROM
`region-us`.INFORMATION_SCHEMA.JOBS_BY_PROJECT --replace to your region
WHERE
state = "DONE"
ORDER BY
creation_time DESC
Corresponding parquet schema
message schema {
string query_id = 1;
string query = 2;
string query_user = 3;
string email = 4;
double total_elapsed_time = 5;
google.protobuf.Timestamp updated_at = 6;
google.protobuf.Timestamp start_time = 7;
google.protobuf.Timestamp end_time = 8;
int64 processed_bytes = 9;
repeated string source_tables = 10;
string destination_table = 11;
string default_database = 12;
string default_schema = 13;
string statement_type = 14;
repeated double allocated_cost = 15;
int64 produced_rows = 16;
int64 execution_time = 17;
string warehouse_size = 18;
string warehouse_id = 19;
string warehouse_name = 20;
string execution_status = 21;
string query_tag = 22;
}
Export query-historys Table To Parquet
- In the BigQuery console, navigate to your dataset.Find the columns table, click the SAVE RESULTS TO select Save to BigQuery Table
- Choose Export to Google Cloud Storage.
- Set the file format to PARQUET and the compression to SNAPPY.
- Specify the destination path in Google Cloud Storage (e.g., gs://your_bucket/columns.parquet).
- Click Export to start the export process.
III. Additional Conversion for query-historys
Due to timestamp format mismatches between BigQuery's RECORD export and Protobuf's time representation, you need to perform a conversion on query-historys.parquet. Use the following Python script for this transformation.
python
import pyarrow as pa
import pyarrow.parquet as pq
# read Parquet
parquet_file = 'origin-query-historys.parquet'
table = pq.read_table(parquet_file)
# read schema
old_schema = table.schema
print("Old Schema:", old_schema)
# create schema
new_fields = []
for field in old_schema:
if field.name in ['updated_at', 'start_time', 'end_time']:
subfields = field.type
new_subfields = []
for subfield in subfields:
if subfield.name == 'nanos':
new_subfield = pa.field('nanos', pa.int32(), nullable=subfield.nullable)
else:
new_subfield = subfield
new_subfields.append(new_subfield)
new_field = pa.field(field.name, pa.struct(new_subfields))
elif field.name == 'output_rows':
new_field = pa.field('output_rows', pa.int32())
else:
new_field = field
new_fields.append(new_field)
new_schema = pa.schema(new_fields)
print("New Schema:", new_schema)
# convert to new schema
def convert_table_schema(table, new_schema):
df = table.to_pandas()
new_table = pa.Table.from_pandas(df, schema=new_schema)
return new_table
# write table
new_table = convert_table_schema(table, new_schema)
pq.write_table(new_table, 'output-query-historys.parquet')
Processing File
After generating the file, you can directly provide it to Single Origin. Accessing files from GCP is currently under development and will be supported soon.
Updated 3 months ago