目前,我正在将数据(从Bigquery中的目标表中)导出到GCS中的存储桶中。使用Bigquery API以编程方式执行此操作。
将数据从Bigquery导出到GCS时存在一个约束-数据不应大于1GB。
这是发生这种情况的函数exportDataToGCS()的代码片段:
http = authorize();
bigquery_service = build('bigquery', 'v2', http=http)
query_request = bigquery_service.jobs()
DESTINATION_PATH = constants.GCS_BUCKET_PATH + canonicalDate + '/'
query_data = {
'projectId': 'ga-cnqr',
'configuration': {
'extract': {
'sourceTable': {
'projectId': constants.PROJECT_ID,
'datasetId': constants.DEST_TABLES_DATASET_ID,
'tableId': canonicalDate,
},
'destinationUris': [DESTINATION_PATH + canonicalDate + '-*.gz'],
'destinationFormat': 'CSV',
'printHeader': 'false',
'compression': 'GZIP'
}
}
}
query_response = query_request.insert(projectId=constants.PROJECT_NUMBER,
body=query_data).execute()
执行此功能后,在我的GCS存储桶中,我的文件以以下方式显示:
但是,我很想知道是否可以在任何情况下将文件拆分为10个部分,但由于上述功能失败,因此只有3个部分进入了存储桶。
也就是说,是否会有部分出口?
诸如网络中断或运行该功能的进程被杀死等原因是否会导致此?此过程是否是阻止呼叫?异步?
提前致谢。
更新1:查询响应中的状态参数
这就是我检查“已完成”状态的方式。
while True:
status = query_request.get(projectId=constants.PROJECT_NUMBER, jobId=query_response['jobReference']['jobId']).execute()
if 'DONE' == status['status']['state']:
logging.info("Finished exporting for the date : " + stringCanonicalDate);
return
如果作业在执行过程中由于某种原因失败,则可能会部分导出。
如果作业处于“已完成”状态,并且作业中没有错误,则说明已导出所有数据。
我建议在轮询完成的工作之前要稍等-如果轮询速度太快,可能会遇到速率限制错误,并且鉴于这些异步作业不是很快,因此不需要毫秒级的准确性。
使用示例代码,可以通过以下方法测试是否存在错误:
while True:
status = query_request.get(projectId=constants.PROJECT_NUMBER, jobId=query_response['jobReference']['jobId']).execute()
if 'DONE' == status['status']['state']:
if 'errorResult' in status['status']:
logging.error("Error exporting for the date : " + stringCanonicalDate);
logging.error(status['status']['errorResult'])
return False
logging.info("Finished exporting for the date : " + stringCanonicalDate);
return True
time.sleep(1)
为了超级健壮,您还可以捕获在轮询等待循环中偶尔发生的HTTP错误。看来您正在使用python apiclient,它会引发apiclient.errors.HttpError
此类故障。
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句