本页介绍了 Spanner 批量写入请求以及如何使用这些请求修改 Spanner 数据。
您可以使用 Spanner 批量写入在 Spanner 表中插入、更新或删除多行。Spanner 批量写入支持无需读取操作即可实现低延迟写入,并会在批量应用更改时返回响应。如需使用批量写入,您可以将相关更改分组在一起,并且一组中的所有更改都会以原子方式提交。各组中的更改会以未指定的顺序应用,并且彼此独立(非原子)。Spanner 无需等待所有更改都应用完毕,即可发送响应,这意味着批量写入允许部分失败。您还可以一次执行多个批量写入。如需了解详情,请参阅如何使用批量写入。
使用场景
如果您想在不执行读取操作的情况下提交大量写入,但不需要对所有更改使用原子事务,Spanner 批量写入功能会特别有用。
如果您想批量处理 DML 请求,请使用批量 DML 修改 Spanner 数据。如需详细了解 DML 和变更之间的差异,请参阅比较 DML 和变更。
对于单个更改请求,我们建议使用锁定读写事务。
限制
Spanner 批量写入存在以下限制:
无法使用 Google Cloud 控制台或 Google Cloud CLI 执行 Spanner 批量写入。只有使用 REST 和 RPC API 以及 Spanner 客户端库时,才能使用该功能。
不支持使用批量写入功能实现重放攻击防范。变更可能会多次应用,而多次应用变更可能会导致失败。例如,如果重放插入更改,可能会产生“已存在”错误;如果您在更改中使用基于生成的键或提交时间戳的键,则可能会导致向表中添加其他行。我们建议您设计您的写入代码结构,使其具有幂等性,以避免此问题。
您无法回滚已完成的批量写入请求。您可以取消正在进行的批量写入请求。如果您取消正在进行的批量写入,系统会回滚未完成的组中的更改。已完成的组中的更改会提交到数据库。
批量写入请求的大小上限与提交请求的上限相同。如需了解详情,请参阅创建、读取、更新和删除数据的限制。
如何使用批量写入
如需使用批量写入,您必须对要修改的数据库拥有 spanner.databases.write
权限。您可以使用 REST 或 RPC API 请求调用,在单个调用中以非原子化方式批量写入更改。
使用批量写入时,您应将以下更改类型归为一组:
- 在父表和子表中插入具有相同主键前缀的行。
- 将行插入表中,且表之间存在外键关系。
- 其他类型的相关更改(具体取决于您的数据库架构和应用逻辑)。
您还可以使用 Spanner 客户端库进行批量写入。以下代码示例使用新行更新 Singers
表。
客户端库
Java
import com.google.api.gax.rpc.ServerStream;
import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.DatabaseId;
import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.MutationGroup;
import com.google.cloud.spanner.Options;
import com.google.cloud.spanner.Spanner;
import com.google.cloud.spanner.SpannerOptions;
import com.google.common.collect.ImmutableList;
import com.google.rpc.Code;
import com.google.spanner.v1.BatchWriteResponse;
public class BatchWriteAtLeastOnceSample {
/***
* Assume DDL for the underlying database:
* <pre>{@code
* CREATE TABLE Singers (
* SingerId INT64 NOT NULL,
* FirstName STRING(1024),
* LastName STRING(1024),
* ) PRIMARY KEY (SingerId)
*
* CREATE TABLE Albums (
* SingerId INT64 NOT NULL,
* AlbumId INT64 NOT NULL,
* AlbumTitle STRING(1024),
* ) PRIMARY KEY (SingerId, AlbumId),
* INTERLEAVE IN PARENT Singers ON DELETE CASCADE
* }</pre>
*/
private static final MutationGroup MUTATION_GROUP1 =
MutationGroup.of(
Mutation.newInsertOrUpdateBuilder("Singers")
.set("SingerId")
.to(16)
.set("FirstName")
.to("Scarlet")
.set("LastName")
.to("Terry")
.build());
private static final MutationGroup MUTATION_GROUP2 =
MutationGroup.of(
Mutation.newInsertOrUpdateBuilder("Singers")
.set("SingerId")
.to(17)
.set("FirstName")
.to("Marc")
.build(),
Mutation.newInsertOrUpdateBuilder("Singers")
.set("SingerId")
.to(18)
.set("FirstName")
.to("Catalina")
.set("LastName")
.to("Smith")
.build(),
Mutation.newInsertOrUpdateBuilder("Albums")
.set("SingerId")
.to(17)
.set("AlbumId")
.to(1)
.set("AlbumTitle")
.to("Total Junk")
.build(),
Mutation.newInsertOrUpdateBuilder("Albums")
.set("SingerId")
.to(18)
.set("AlbumId")
.to(2)
.set("AlbumTitle")
.to("Go, Go, Go")
.build());
static void batchWriteAtLeastOnce() {
// TODO(developer): Replace these variables before running the sample.
final String projectId = "my-project";
final String instanceId = "my-instance";
final String databaseId = "my-database";
batchWriteAtLeastOnce(projectId, instanceId, databaseId);
}
static void batchWriteAtLeastOnce(String projectId, String instanceId, String databaseId) {
try (Spanner spanner =
SpannerOptions.newBuilder().setProjectId(projectId).build().getService()) {
DatabaseId dbId = DatabaseId.of(projectId, instanceId, databaseId);
final DatabaseClient dbClient = spanner.getDatabaseClient(dbId);
// Creates and issues a BatchWrite RPC request that will apply the mutation groups
// non-atomically and respond back with a stream of BatchWriteResponse.
ServerStream<BatchWriteResponse> responses =
dbClient.batchWriteAtLeastOnce(
ImmutableList.of(MUTATION_GROUP1, MUTATION_GROUP2),
Options.tag("batch-write-tag"));
// Iterates through the results in the stream response and prints the MutationGroup indexes,
// commit timestamp and status.
for (BatchWriteResponse response : responses) {
if (response.getStatus().getCode() == Code.OK_VALUE) {
System.out.printf(
"Mutation group indexes %s have been applied with commit timestamp %s",
response.getIndexesList(), response.getCommitTimestamp());
} else {
System.out.printf(
"Mutation group indexes %s could not be applied with error code %s and "
+ "error message %s", response.getIndexesList(),
Code.forNumber(response.getStatus().getCode()), response.getStatus().getMessage());
}
}
}
}
}
Go
import (
"context"
"fmt"
"io"
"cloud.google.com/go/spanner"
sppb "cloud.google.com/go/spanner/apiv1/spannerpb"
"google.golang.org/grpc/status"
)
// batchWrite demonstrates writing mutations to a Spanner database through
// BatchWrite API - https://pkg.go.dev/cloud.google.com/go/spanner#Client.BatchWrite
func batchWrite(w io.Writer, db string) error {
// db := "projects/my-project/instances/my-instance/databases/my-database"
ctx := context.Background()
client, err := spanner.NewClient(ctx, db)
if err != nil {
return err
}
defer client.Close()
// Database is assumed to exist - https://cloud.google.com/spanner/docs/getting-started/go#create_a_database
singerColumns := []string{"SingerId", "FirstName", "LastName"}
albumColumns := []string{"SingerId", "AlbumId", "AlbumTitle"}
mutationGroups := make([]*spanner.MutationGroup, 2)
mutationGroup1 := []*spanner.Mutation{
spanner.InsertOrUpdate("Singers", singerColumns, []interface{}{16, "Scarlet", "Terry"}),
}
mutationGroups[0] = &spanner.MutationGroup{Mutations: mutationGroup1}
mutationGroup2 := []*spanner.Mutation{
spanner.InsertOrUpdate("Singers", singerColumns, []interface{}{17, "Marc", ""}),
spanner.InsertOrUpdate("Singers", singerColumns, []interface{}{18, "Catalina", "Smith"}),
spanner.InsertOrUpdate("Albums", albumColumns, []interface{}{17, 1, "Total Junk"}),
spanner.InsertOrUpdate("Albums", albumColumns, []interface{}{18, 2, "Go, Go, Go"}),
}
mutationGroups[1] = &spanner.MutationGroup{Mutations: mutationGroup2}
iter := client.BatchWrite(ctx, mutationGroups)
// See https://pkg.go.dev/cloud.google.com/go/spanner#BatchWriteResponseIterator.Do
doFunc := func(response *sppb.BatchWriteResponse) error {
if err = status.ErrorProto(response.GetStatus()); err == nil {
fmt.Fprintf(w, "Mutation group indexes %v have been applied with commit timestamp %v",
response.GetIndexes(), response.GetCommitTimestamp())
} else {
fmt.Fprintf(w, "Mutation group indexes %v could not be applied with error %v",
response.GetIndexes(), err)
}
// Return an actual error as needed.
return nil
}
return iter.Do(doFunc)
}
节点
// Imports the Google Cloud client library
const {Spanner, MutationGroup} = require('@google-cloud/spanner');
/**
* TODO(developer): Uncomment the following lines before running the sample.
*/
// const instanceId = 'my-instance';
// const databaseId = 'my-database';
// const projectId = 'my-project-id';
// Creates a client
const spanner = new Spanner({
projectId: projectId,
});
// Gets a reference to a Cloud Spanner instance and database
const instance = spanner.instance(instanceId);
const database = instance.database(databaseId);
// Create Mutation Groups
/**
* Related mutations should be placed in a group, such as insert mutations for both a parent and a child row.
* A group must contain related mutations.
* Please see {@link https://cloud.google.com/spanner/docs/reference/rpc/google.spanner.v1#google.spanner.v1.BatchWriteRequest.MutationGroup}
* for more details and examples.
*/
const mutationGroup1 = new MutationGroup();
mutationGroup1.insert('Singers', {
SingerId: 1,
FirstName: 'Scarlet',
LastName: 'Terry',
});
const mutationGroup2 = new MutationGroup();
mutationGroup2.insert('Singers', {
SingerId: 2,
FirstName: 'Marc',
});
mutationGroup2.insert('Singers', {
SingerId: 3,
FirstName: 'Catalina',
LastName: 'Smith',
});
mutationGroup2.insert('Albums', {
AlbumId: 1,
SingerId: 2,
AlbumTitle: 'Total Junk',
});
mutationGroup2.insert('Albums', {
AlbumId: 2,
SingerId: 3,
AlbumTitle: 'Go, Go, Go',
});
const options = {
transactionTag: 'batch-write-tag',
};
try {
database
.batchWriteAtLeastOnce([mutationGroup1, mutationGroup2], options)
.on('error', console.error)
.on('data', response => {
// Check the response code of each response to determine whether the mutation group(s) were applied successfully.
if (response.status.code === 0) {
console.log(
`Mutation group indexes ${
response.indexes
}, have been applied with commit timestamp ${Spanner.timestamp(
response.commitTimestamp
).toJSON()}`
);
}
// Mutation groups that fail to commit trigger a response with a non-zero status code.
else {
console.log(
`Mutation group indexes ${response.indexes}, could not be applied with error code ${response.status.code}, and error message ${response.status.message}`
);
}
})
.on('end', () => {
console.log('Request completed successfully');
});
} catch (err) {
console.log(err);
}
Python
def batch_write(instance_id, database_id):
"""Inserts sample data into the given database via BatchWrite API.
The database and table must already exist and can be created using
`create_database`.
"""
from google.rpc.code_pb2 import OK
spanner_client = spanner.Client()
instance = spanner_client.instance(instance_id)
database = instance.database(database_id)
with database.mutation_groups() as groups:
group1 = groups.group()
group1.insert_or_update(
table="Singers",
columns=("SingerId", "FirstName", "LastName"),
values=[
(16, "Scarlet", "Terry"),
],
)
group2 = groups.group()
group2.insert_or_update(
table="Singers",
columns=("SingerId", "FirstName", "LastName"),
values=[
(17, "Marc", ""),
(18, "Catalina", "Smith"),
],
)
group2.insert_or_update(
table="Albums",
columns=("SingerId", "AlbumId", "AlbumTitle"),
values=[
(17, 1, "Total Junk"),
(18, 2, "Go, Go, Go"),
],
)
for response in groups.batch_write():
if response.status.code == OK:
print(
"Mutation group indexes {} have been applied with commit timestamp {}".format(
response.indexes, response.commit_timestamp
)
)
else:
print(
"Mutation group indexes {} could not be applied with error {}".format(
response.indexes, response.status
)
)
C++
namespace spanner = ::google::cloud::spanner;
// Use upserts as mutation groups are not replay protected.
auto commit_results = client.CommitAtLeastOnce({
// group #0
spanner::Mutations{
spanner::InsertOrUpdateMutationBuilder(
"Singers", {"SingerId", "FirstName", "LastName"})
.EmplaceRow(16, "Scarlet", "Terry")
.Build(),
},
// group #1
spanner::Mutations{
spanner::InsertOrUpdateMutationBuilder(
"Singers", {"SingerId", "FirstName", "LastName"})
.EmplaceRow(17, "Marc", "")
.EmplaceRow(18, "Catalina", "Smith")
.Build(),
spanner::InsertOrUpdateMutationBuilder(
"Albums", {"SingerId", "AlbumId", "AlbumTitle"})
.EmplaceRow(17, 1, "Total Junk")
.EmplaceRow(18, 2, "Go, Go, Go")
.Build(),
},
});
for (auto& commit_result : commit_results) {
if (!commit_result) throw std::move(commit_result).status();
std::cout << "Mutation group indexes [";
for (auto index : commit_result->indexes) std::cout << " " << index;
std::cout << " ]: ";
if (commit_result->commit_timestamp) {
auto const& ts = *commit_result->commit_timestamp;
std::cout << "Committed at " << ts.get<absl::Time>().value();
} else {
std::cout << commit_result->commit_timestamp.status();
}
std::cout << "\n";
}
后续步骤
- 详细了解 Spanner 事务。