import csv import logging import random import string from google.cloud import bigquery from google.oauth2 import service_account SVC_ACCT_JSON_FILENAME = "creds.json" GCP_PROJECT_ID = "UPDATE_THIS" DATASET_ID = "UPDATE_THIS" BQ_TABLE_ID = "UPDATE_THIS" CSV_FILENAME = "test_table_data.csv" CSV_NUM_ROWS = 3000000 """ -- Create table in BQ before running this script CREATE TABLE UPDATE_THIS.UPDATE_THIS ( col1_str STRING, col2_str STRING, col3_str STRING, col4_str STRING, col5_str STRING, col6_str STRING, col7_int INT64, col8_int INT64 ) """ # Command line comparison. This uses full bandwidth ~70 MBps on my mac vs # about 4MBps on the same machine using this script/python client """ bq load \ --source_format=CSV \ --replace=true \ --skip_leading_rows=1 \ UPDATE_THIS:UPDATE_THIS.UPDATE_THIS \ ./test_table_data.csv \ col1_str:STRING,col2_str:STRING,col3_str:STRING,col4_str:STRING,col5_str:STRING,col6_str:STRING,col7_int:INTEGER,col8_int:INTEGER """ def main(): generate_csv() # Run first time then reuse # Create client credentials = service_account.Credentials.from_service_account_file( SVC_ACCT_JSON_FILENAME, scopes=["https://www.googleapis.com/auth/cloud-platform"], ) bq_client = bigquery.Client( credentials=credentials, project=GCP_PROJECT_ID ) dataset_ref = bq_client.dataset(DATASET_ID) table_ref = dataset_ref.table(BQ_TABLE_ID) config = bigquery.LoadJobConfig() config.autodetect = False config.source_format = "CSV" config.skip_leading_rows = 1 config.write_disposition = "WRITE_TRUNCATE" logging.info("Beginning load job...") with open(CSV_FILENAME, "rb") as source_file: job = bq_client.load_table_from_file( source_file, table_ref, job_config=config ) job.result() # Starts job and waits for table load to complete. logging.info("Job ID: %s", job.job_id) if job.errors is None and job.error_result is None: logging.info("BQ load job complete without error!") logging.info( "Loaded %d rows", job.output_rows ) else: msg = ("bderr: BQ load job failed with error_result: " f"{job.error_result} and errors: {job.errors}") logging.error(msg) def generate_csv(): """Generates csv of string/int data types. File size should be around 1GB and include the header. """ logging.info("Generating CSV...") header = [ "col1_str", "col2_str", "col3_str", "col4_str", "col5_str", "col6_str", "col7_int", "col8_int" ] char_bank = string.ascii_letters + string.digits with open(CSV_FILENAME, "w") as fout: w_csv = csv.writer(fout) w_csv.writerow(header) for x in range(CSV_NUM_ROWS): if x % 100000 == 0: logging.info("Written %d out of %d rows...", x, CSV_NUM_ROWS) w_csv.writerow([ "".join(random.choices(char_bank, k=48)), "".join(random.choices(char_bank, k=48)), "".join(random.choices(char_bank, k=48)), "".join(random.choices(char_bank, k=48)), "".join(random.choices(char_bank, k=48)), "".join(random.choices(char_bank, k=48)), random.randrange(100000000), random.randrange(100000000), ]) if __name__ == "__main__": fmt = "%(asctime)s %(name)-25s %(module)-24s %(levelname)9s: %(message)s" logging.basicConfig(format=fmt) logging.getLogger().setLevel(logging.INFO) logging.info("SCRIPT START") main() logging.info("SCRIPT END")