【发布时间】:2022-08-18 17:53:10
【问题描述】:
我正在尝试利用 BigQuery python API 中的会话,以执行多语句事务,如this blogpost 中所示。
这是我的上下文管理器代码:
from google.cloud import bigquery
class BigquerySession:
\"\"\"ContextManager wrapping a bigquerySession.\"\"\"
def __init__(self, bqclient: bigquery.Client, bqlocation: str = \"EU\") -> None:
\"\"\"Construct instance.\"\"\"
self._bigquery_client = bqclient
self._location = bqlocation
self._session_id = None
def __enter__(self) -> str:
\"\"\"Initiate a Bigquery session and return the session_id.\"\"\"
job = self._bigquery_client.query(
\"SELECT 1;\", # a query can\'t fail
job_config=bigquery.QueryJobConfig(create_session=True),
location=self._location,
)
self._session_id = job.session_info.session_id
job.result() # wait job completion
return self._session_id
def __exit__(self, exc_type, exc_value, traceback):
\"\"\"Abort the opened session.\"\"\"
if exc_type:
print(\"Transaction failed, performing rollback\")
job = self._bigquery_client.query(
\"ROLLBACK TRANSACTION;\",
job_config=bigquery.QueryJobConfig(
create_session=False,
connection_properties=[
bigquery.query.ConnectionProperty(key=\"session_id\", value=self._session_id)
],
),
location=self._location,
)
job.result()
if self._session_id:
# abort the session in any case to have a clean state at the end
# (sometimes in case of script failure, the table is locked in
# the session)
job = self._bigquery_client.query(
\"CALL BQ.ABORT_SESSION();\",
job_config=bigquery.QueryJobConfig(
create_session=False,
connection_properties=[
bigquery.query.ConnectionProperty(
key=\"session_id\", value=self._session_id
)
],
),
location=self._location,
)
job.result()
return False
它似乎工作正常,但如果我尝试中断事务而不故意提交它,它无论如何都会写入结果而不回滚,即使显式执行它也是如此。
这是一个交易的例子:
# Open transaction
job = self.client.query(
\"BEGIN TRANSACTION;\",
job_config=bigquery.QueryJobConfig(
create_session=False,
connection_properties=[
bigquery.query.ConnectionProperty(key=\"session_id\", value=session_id)
]
),
location=self.dataset.location,
)
job.result()
# DML queries
job = self.client.query(
aggregation_query,
job_config=bigquery.QueryJobConfig(
create_session=False,
connection_properties=[
bigquery.query.ConnectionProperty(key=\"session_id\", value=session_id)
],
destination=f\"{self.dataset.project}.{self.dataset.dataset_id}.{table_name}\",
create_disposition=\"CREATE_NEVER\",
write_disposition=\"WRITE_APPEND\"
),
location=self.dataset.location,
)
print(job.result())
# This will avoid the commit statement
raise KeyboardInterrupt
# Commit transaction
job = self.client.query(
\"COMMIT TRANSACTION;\",
job_config=bigquery.QueryJobConfig(
create_session=False,
connection_properties=[
bigquery.query.ConnectionProperty(key=\"session_id\", value=session_id)
],
),
location=self.dataset.location,
)
job.result()
正如我所说,运行此代码后,回滚语句运行,我也可以在控制台的运行作业中看到它,但是之后我仍然会在目标表中找到写入的行。
我的假设是 2:select with destination 语句不被认为是 DML,因此不受回滚操作的影响或者会话 API 中存在错误(但是应该在 relative documentation 之后支持它),仍然标记为preview 在文档中。
但是我无法证明其中哪一个是正确的。
-
在 SQL 中,
SELECT语句不被视为 DML 语句,考虑用于 DML 的语句为:INSERT、UPDATE、DELETE、MERGE和INSERT INTO SELECT,因此可能因为它不考虑 @987654333 @ 声明操作没有被回滚。 -
我理解,但在这种情况下,文档具有误导性。它指出在事务中支持
SELECT操作,但仅部分支持它们。我对吗? -
@Tonca 这是post your solution as an answer 的好习惯。
-
谢谢@HectorMartinezRodriguez。我对我的解决方案不满意,因为我最终决定不使用这个 BigQuery 功能。我还在等待贡献,但在这一点上,我想不会有任何贡献。