Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -684,22 +684,11 @@ QueryOptions buildQueryOptions(QueryOptions requestOptions) {
}

RequestOptions buildRequestOptions(Options options) {
// Shortcut for the most common return value.
if (!(options.hasPriority() || options.hasTag() || getTransactionTag() != null)) {
return RequestOptions.getDefaultInstance();
}

RequestOptions.Builder builder = RequestOptions.newBuilder();
if (options.hasPriority()) {
builder.setPriority(options.priority());
}
if (options.hasTag()) {
builder.setRequestTag(options.tag());
}
RequestOptions requestOptions = options.toRequestOptionsProto(false);
if (getTransactionTag() != null) {
builder.setTransactionTag(getTransactionTag());
return requestOptions.toBuilder().setTransactionTag(getTransactionTag()).build();
}
return builder.build();
return requestOptions;
}

ExecuteSqlRequest.Builder getExecuteSqlRequestBuilder(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.spanner.v1.DirectedReadOptions;
import com.google.spanner.v1.ReadRequest.LockHint;
import com.google.spanner.v1.ReadRequest.OrderBy;
import com.google.spanner.v1.RequestOptions;
import com.google.spanner.v1.RequestOptions.Priority;
import com.google.spanner.v1.TransactionOptions.IsolationLevel;
import com.google.spanner.v1.TransactionOptions.ReadWrite.ReadLockMode;
Expand Down Expand Up @@ -265,6 +266,37 @@ public static ReadQueryUpdateTransactionOption priority(RpcPriority priority) {
return new PriorityOption(priority);
}

/**
* Specifying this will add the given client context to the request. The client context is used to
* pass side-channel or configuration information to the backend, such as a user ID for a parameterized
* secure view.
*/
public static ReadQueryUpdateTransactionOption clientContext(
RequestOptions.ClientContext clientContext) {
return new ClientContextOption(clientContext);
}

RequestOptions toRequestOptionsProto(boolean isTransactionOption) {
if (!hasPriority() && !hasTag() && !hasClientContext()) {
return RequestOptions.getDefaultInstance();
}
RequestOptions.Builder builder = RequestOptions.newBuilder();
if (hasPriority()) {
builder.setPriority(priority());
}
if (hasTag()) {
if (isTransactionOption) {
builder.setTransactionTag(tag());
} else {
builder.setRequestTag(tag());
}
}
if (hasClientContext()) {
builder.setClientContext(clientContext());
}
return builder.build();
}

public static TransactionOption maxCommitDelay(Duration maxCommitDelay) {
Preconditions.checkArgument(!maxCommitDelay.isNegative(), "maxCommitDelay should be positive");
return new MaxCommitDelayOption(maxCommitDelay);
Expand Down Expand Up @@ -462,6 +494,20 @@ void appendToOptions(Options options) {
}
}

static final class ClientContextOption extends InternalOption
implements ReadQueryUpdateTransactionOption {
private final RequestOptions.ClientContext clientContext;

ClientContextOption(RequestOptions.ClientContext clientContext) {
this.clientContext = clientContext;
}

@Override
void appendToOptions(Options options) {
options.clientContext = clientContext;
}
}

static final class TagOption extends InternalOption implements ReadQueryUpdateTransactionOption {
private final String tag;

Expand Down Expand Up @@ -574,6 +620,7 @@ void appendToOptions(Options options) {
private String filter;
private RpcPriority priority;
private String tag;
private RequestOptions.ClientContext clientContext;
private String etag;
private Boolean validateOnly;
private Boolean withExcludeTxnFromChangeStreams;
Expand Down Expand Up @@ -666,6 +713,14 @@ Priority priority() {
return priority == null ? null : priority.proto;
}

boolean hasClientContext() {
return clientContext != null;
}

RequestOptions.ClientContext clientContext() {
return clientContext;
}

boolean hasTag() {
return tag != null;
}
Expand Down Expand Up @@ -777,6 +832,9 @@ public String toString() {
if (priority != null) {
b.append("priority: ").append(priority).append(' ');
}
if (clientContext != null) {
b.append("clientContext: ").append(clientContext).append(' ');
}
if (tag != null) {
b.append("tag: ").append(tag).append(' ');
}
Expand Down Expand Up @@ -850,6 +908,7 @@ public boolean equals(Object o) {
&& Objects.equals(pageToken(), that.pageToken())
&& Objects.equals(filter(), that.filter())
&& Objects.equals(priority(), that.priority())
&& Objects.equals(clientContext(), that.clientContext())
&& Objects.equals(tag(), that.tag())
&& Objects.equals(etag(), that.etag())
&& Objects.equals(validateOnly(), that.validateOnly())
Expand Down Expand Up @@ -894,6 +953,9 @@ public int hashCode() {
if (priority != null) {
result = 31 * result + priority.hashCode();
}
if (clientContext != null) {
result = 31 * result + clientContext.hashCode();
}
if (tag != null) {
result = 31 * result + tag.hashCode();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -481,9 +481,12 @@ ApiFuture<Transaction> beginTransactionAsync(
if (sessionReference.getIsMultiplexed() && mutation != null) {
requestBuilder.setMutationKey(mutation);
}
if (sessionReference.getIsMultiplexed() && !Strings.isNullOrEmpty(transactionOptions.tag())) {
requestBuilder.setRequestOptions(
RequestOptions.newBuilder().setTransactionTag(transactionOptions.tag()).build());
RequestOptions requestOptions = transactionOptions.toRequestOptionsProto(true);
if (!sessionReference.getIsMultiplexed()) {
requestOptions = requestOptions.toBuilder().clearTransactionTag().build();
}
if (!requestOptions.equals(RequestOptions.getDefaultInstance())) {
requestBuilder.setRequestOptions(requestOptions);
}
final BeginTransactionRequest request = requestBuilder.build();
final ApiFuture<Transaction> requestFuture;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -464,15 +464,9 @@ public void run() {
waitForTransactionTimeoutMillis, TimeUnit.MILLISECONDS)
: transactionId);
}
if (options.hasPriority() || getTransactionTag() != null) {
RequestOptions.Builder requestOptionsBuilder = RequestOptions.newBuilder();
if (options.hasPriority()) {
requestOptionsBuilder.setPriority(options.priority());
}
if (getTransactionTag() != null) {
requestOptionsBuilder.setTransactionTag(getTransactionTag());
}
requestBuilder.setRequestOptions(requestOptionsBuilder.build());
RequestOptions requestOptions = options.toRequestOptionsProto(true);
if (!requestOptions.equals(RequestOptions.getDefaultInstance())) {
requestBuilder.setRequestOptions(requestOptions);
}
if (session.getIsMultiplexed() && getLatestPrecommitToken() != null) {
// Set the precommit token in the CommitRequest for multiplexed sessions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ abstract class AbstractBaseUnitOfWork implements UnitOfWork {
protected final List<TransactionRetryListener> transactionRetryListeners;
protected final boolean excludeTxnFromChangeStreams;
protected final RpcPriority rpcPriority;
protected final com.google.spanner.v1.RequestOptions.ClientContext clientContext;
protected final Span span;

/** Class for keeping track of the stacktrace of the caller of an async statement. */
Expand Down Expand Up @@ -117,6 +118,7 @@ abstract static class Builder<B extends Builder<?, T>, T extends AbstractBaseUni

private boolean excludeTxnFromChangeStreams;
private RpcPriority rpcPriority;
private com.google.spanner.v1.RequestOptions.ClientContext clientContext;
private Span span;

Builder() {}
Expand Down Expand Up @@ -163,6 +165,11 @@ B setRpcPriority(@Nullable RpcPriority rpcPriority) {
return self();
}

B setClientContext(@Nullable com.google.spanner.v1.RequestOptions.ClientContext clientContext) {
this.clientContext = clientContext;
return self();
}

B setSpan(@Nullable Span span) {
this.span = span;
return self();
Expand All @@ -179,6 +186,7 @@ B setSpan(@Nullable Span span) {
this.transactionRetryListeners = builder.transactionRetryListeners;
this.excludeTxnFromChangeStreams = builder.excludeTxnFromChangeStreams;
this.rpcPriority = builder.rpcPriority;
this.clientContext = builder.clientContext;
this.span = Preconditions.checkNotNull(builder.span);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,25 @@ default String getStatementTag() {
throw new UnsupportedOperationException();
}

/**
* Sets the client context to use for the statements that are executed. The client context
* persists until it is changed or cleared.
*
* @param clientContext The client context to use with the statements that will be executed on
* this connection.
*/
default void setClientContext(com.google.spanner.v1.RequestOptions.ClientContext clientContext) {
throw new UnsupportedOperationException();
}

/**
* @return The client context that will be used with the statements that are executed on this
* connection.
*/
default com.google.spanner.v1.RequestOptions.ClientContext getClientContext() {
throw new UnsupportedOperationException();
}

/**
* Sets whether the next transaction should be excluded from all change streams with the DDL
* option `allow_txn_exclusion=true`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
import com.google.common.util.concurrent.MoreExecutors;
import com.google.spanner.v1.DirectedReadOptions;
import com.google.spanner.v1.ExecuteSqlRequest.QueryOptions;
import com.google.spanner.v1.RequestOptions;
import com.google.spanner.v1.ResultSetStats;
import com.google.spanner.v1.TransactionOptions.IsolationLevel;
import com.google.spanner.v1.TransactionOptions.ReadWrite.ReadLockMode;
Expand Down Expand Up @@ -299,6 +300,7 @@ static UnitOfWorkType of(TransactionMode transactionMode) {
private IsolationLevel transactionIsolationLevel;
private String transactionTag;
private String statementTag;
private RequestOptions.ClientContext clientContext;
private boolean excludeTxnFromChangeStreams;
private byte[] protoDescriptors;
private String protoDescriptorsFilePath;
Expand Down Expand Up @@ -536,6 +538,7 @@ private void reset(Context context, boolean inTransaction) {
this.connectionState.resetValue(SAVEPOINT_SUPPORT, context, inTransaction);
this.protoDescriptors = null;
this.protoDescriptorsFilePath = null;
this.clientContext = null;

if (!isTransactionStarted()) {
setDefaultTransactionOptions(getDefaultIsolationLevel());
Expand Down Expand Up @@ -955,6 +958,18 @@ public String getTransactionTag() {
return transactionTag;
}

@Override
public void setClientContext(RequestOptions.ClientContext clientContext) {
ConnectionPreconditions.checkState(!isClosed(), CLOSED_ERROR_MSG);
this.clientContext = clientContext;
}

@Override
public RequestOptions.ClientContext getClientContext() {
ConnectionPreconditions.checkState(!isClosed(), CLOSED_ERROR_MSG);
return clientContext;
}

@Override
public void setTransactionTag(String tag) {
ConnectionPreconditions.checkState(!isClosed(), CLOSED_ERROR_MSG);
Expand Down Expand Up @@ -2026,6 +2041,9 @@ private QueryOption[] mergeQueryRequestOptions(
options =
appendQueryOption(options, Options.priority(getConnectionPropertyValue(RPC_PRIORITY)));
}
if (clientContext != null) {
options = appendQueryOption(options, Options.clientContext(clientContext));
}
if (currentUnitOfWork != null
&& currentUnitOfWork.supportsDirectedReads(parsedStatement)
&& getConnectionPropertyValue(DIRECTED_READ) != null) {
Expand Down Expand Up @@ -2070,6 +2088,14 @@ private UpdateOption[] mergeUpdateRequestOptions(UpdateOption... options) {
options[options.length - 1] = Options.priority(getConnectionPropertyValue(RPC_PRIORITY));
}
}
if (clientContext != null) {
if (options == null || options.length == 0) {
options = new UpdateOption[] {Options.clientContext(clientContext)};
} else {
options = Arrays.copyOf(options, options.length + 1);
options[options.length - 1] = Options.clientContext(clientContext);
}
}
return options;
}

Expand Down Expand Up @@ -2299,6 +2325,7 @@ UnitOfWork createNewUnitOfWork(
createSpanForUnitOfWork(
statementType == StatementType.DDL ? DDL_STATEMENT : SINGLE_USE_TRANSACTION))
.setProtoDescriptors(getProtoDescriptors())
.setClientContext(clientContext)
.build();
if (!isInternalMetadataQuery && !forceSingleUse) {
// Reset the transaction options after starting a single-use transaction.
Expand All @@ -2317,6 +2344,7 @@ UnitOfWork createNewUnitOfWork(
.setTransactionTag(transactionTag)
.setRpcPriority(getConnectionPropertyValue(RPC_PRIORITY))
.setSpan(createSpanForUnitOfWork(READ_ONLY_TRANSACTION))
.setClientContext(clientContext)
.build();
case READ_WRITE_TRANSACTION:
return ReadWriteTransaction.newBuilder()
Expand All @@ -2340,6 +2368,7 @@ UnitOfWork createNewUnitOfWork(
.setExcludeTxnFromChangeStreams(excludeTxnFromChangeStreams)
.setRpcPriority(getConnectionPropertyValue(RPC_PRIORITY))
.setSpan(createSpanForUnitOfWork(READ_WRITE_TRANSACTION))
.setClientContext(clientContext)
.build();
case DML_BATCH:
// A DML batch can run inside the current transaction. It should therefore only
Expand All @@ -2359,6 +2388,7 @@ UnitOfWork createNewUnitOfWork(
.setRpcPriority(getConnectionPropertyValue(RPC_PRIORITY))
// Use the transaction Span for the DML batch.
.setSpan(transactionStack.peek().getSpan())
.setClientContext(clientContext)
.build();
case DDL_BATCH:
return DdlBatch.newBuilder()
Expand All @@ -2369,6 +2399,7 @@ UnitOfWork createNewUnitOfWork(
.setSpan(createSpanForUnitOfWork(DDL_BATCH))
.setProtoDescriptors(getProtoDescriptors())
.setConnectionState(connectionState)
.setClientContext(clientContext)
.build();
default:
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,9 @@ private TransactionOption[] extractOptions(Builder builder) {
if (this.readLockMode != ReadLockMode.READ_LOCK_MODE_UNSPECIFIED) {
numOptions++;
}
if (this.clientContext != null) {
numOptions++;
}
TransactionOption[] options = new TransactionOption[numOptions];
int index = 0;
if (builder.returnCommitStats) {
Expand All @@ -373,6 +376,9 @@ private TransactionOption[] extractOptions(Builder builder) {
if (this.readLockMode != ReadLockMode.READ_LOCK_MODE_UNSPECIFIED) {
options[index++] = Options.readLockMode(this.readLockMode);
}
if (this.clientContext != null) {
options[index++] = Options.clientContext(this.clientContext);
}
return options;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,9 @@ private TransactionRunner createWriteTransaction() {
!= ReadLockMode.READ_LOCK_MODE_UNSPECIFIED) {
numOptions++;
}
if (this.clientContext != null) {
numOptions++;
}
if (numOptions == 0) {
return dbClient.readWriteTransaction();
}
Expand Down Expand Up @@ -547,6 +550,9 @@ private TransactionRunner createWriteTransaction() {
!= ReadLockMode.READ_LOCK_MODE_UNSPECIFIED) {
options[index++] = Options.readLockMode(connectionState.getValue(READ_LOCK_MODE).getValue());
}
if (this.clientContext != null) {
options[index++] = Options.clientContext(this.clientContext);
}
return dbClient.readWriteTransaction(options);
}

Expand Down
Loading
Loading