Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions google/auth/environment_vars.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,18 @@
"""Environment variable defining the location of Google API certificate config
file."""

CLOUDSDK_CONTEXT_AWARE_USE_CLIENT_CERTIFICATE = (
"CLOUDSDK_CONTEXT_AWARE_USE_CLIENT_CERTIFICATE"
)
"""Environment variable controlling whether to use client certificate or not.
This variable is the fallback of GOOGLE_API_USE_CLIENT_CERTIFICATE."""

CLOUDSDK_CONTEXT_AWARE_CERTIFICATE_CONFIG_FILE_PATH = (
"CLOUDSDK_CONTEXT_AWARE_CERTIFICATE_CONFIG_FILE_PATH"
)
"""Environment variable defining the location of Google API certificate config
file. This variable is the fallback of GOOGLE_API_CERTIFICATE_CONFIG."""

GOOGLE_API_PREVENT_AGENT_TOKEN_SHARING_FOR_GCP_SERVICES = (
"GOOGLE_API_PREVENT_AGENT_TOKEN_SHARING_FOR_GCP_SERVICES"
)
Expand Down
20 changes: 4 additions & 16 deletions google/auth/iam.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,13 @@
import base64
import http.client as http_client
import json
import os

from google.auth import _exponential_backoff
from google.auth import _helpers
from google.auth import credentials
from google.auth import crypt
from google.auth import exceptions
from google.auth.transport import mtls
from google.auth.transport import _mtls_helper

IAM_RETRY_CODES = {
http_client.INTERNAL_SERVER_ERROR,
Expand All @@ -40,20 +39,9 @@

_IAM_SCOPE = ["https://www.googleapis.com/auth/iam"]

# 1. Determine if we should use mTLS.
# Note: We only support automatic mTLS on the default googleapis.com universe.
if hasattr(mtls, "should_use_client_cert"):
use_client_cert = mtls.should_use_client_cert()
else: # pragma: NO COVER
# if unsupported, fallback to reading from env var
use_client_cert = (
os.getenv("GOOGLE_API_USE_CLIENT_CERTIFICATE", "false").lower() == "true"
)

# 2. Construct the template domain using the library's DEFAULT_UNIVERSE_DOMAIN constant.
# This ensures that the .replace() calls in the classes will work correctly.
if use_client_cert:
# We use the .mtls. prefix only for the default universe template
# Determine if we should use mTLS.
if hasattr(_mtls_helper, "check_use_client_cert") and _mtls_helper.check_use_client_cert():
# Construct the template domain using the library's DEFAULT_UNIVERSE_DOMAIN constant.
_IAM_DOMAIN = f"iamcredentials.mtls.{credentials.DEFAULT_UNIVERSE_DOMAIN}"
else:
_IAM_DOMAIN = f"iamcredentials.{credentials.DEFAULT_UNIVERSE_DOMAIN}"
Expand Down
23 changes: 20 additions & 3 deletions google/auth/transport/_mtls_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,14 @@ def _get_cert_config_path(certificate_config_path=None):
if env_path is not None and env_path != "":
certificate_config_path = env_path
else:
certificate_config_path = CERTIFICATE_CONFIGURATION_DEFAULT_PATH
env_path = environ.get(
environment_vars.CLOUDSDK_CONTEXT_AWARE_CERTIFICATE_CONFIG_FILE_PATH,
None,
)
if env_path is not None and env_path != "":
certificate_config_path = env_path
else:
certificate_config_path = CERTIFICATE_CONFIGURATION_DEFAULT_PATH

certificate_config_path = path.expanduser(certificate_config_path)
if not path.exists(certificate_config_path):
Expand Down Expand Up @@ -452,13 +459,23 @@ def check_use_client_cert():
Returns:
bool: Whether the client certificate should be used for mTLS connection.
"""
use_client_cert = getenv("GOOGLE_API_USE_CLIENT_CERTIFICATE")
use_client_cert = getenv(environment_vars.GOOGLE_API_USE_CLIENT_CERTIFICATE)
if use_client_cert is None:
use_client_cert = getenv(
environment_vars.CLOUDSDK_CONTEXT_AWARE_USE_CLIENT_CERTIFICATE
)

# Check if the value of GOOGLE_API_USE_CLIENT_CERTIFICATE is set.
if use_client_cert:
return use_client_cert.lower() == "true"
else:
# Check if the value of GOOGLE_API_CERTIFICATE_CONFIG is set.
cert_path = getenv("GOOGLE_API_CERTIFICATE_CONFIG")
cert_path = getenv(environment_vars.GOOGLE_API_CERTIFICATE_CONFIG)
if cert_path is None:
cert_path = getenv(
environment_vars.CLOUDSDK_CONTEXT_AWARE_CERTIFICATE_CONFIG_FILE_PATH
)

if cert_path:
try:
with open(cert_path, "r") as f:
Expand Down
97 changes: 97 additions & 0 deletions tests/transport/test_mtls_env_vars.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
from unittest import mock
import pytest
from google.auth.transport import _mtls_helper
from google.auth import environment_vars

class TestEnvVarsPrecedence:
def test_use_client_cert_precedence(self):
# GOOGLE_API_USE_CLIENT_CERTIFICATE takes precedence
with mock.patch.dict(os.environ, {
environment_vars.GOOGLE_API_USE_CLIENT_CERTIFICATE: "true",
environment_vars.CLOUDSDK_CONTEXT_AWARE_USE_CLIENT_CERTIFICATE: "false"
}):
assert _mtls_helper.check_use_client_cert() is True

with mock.patch.dict(os.environ, {
environment_vars.GOOGLE_API_USE_CLIENT_CERTIFICATE: "false",
environment_vars.CLOUDSDK_CONTEXT_AWARE_USE_CLIENT_CERTIFICATE: "true"
}):
assert _mtls_helper.check_use_client_cert() is False

def test_use_client_cert_fallback(self):
# Fallback to CLOUDSDK_CONTEXT_AWARE_USE_CLIENT_CERTIFICATE if GOOGLE_API_USE_CLIENT_CERTIFICATE is unset
with mock.patch.dict(os.environ, {
environment_vars.CLOUDSDK_CONTEXT_AWARE_USE_CLIENT_CERTIFICATE: "true"
}):
# Ensure GOOGLE_API_USE_CLIENT_CERTIFICATE is not set
if environment_vars.GOOGLE_API_USE_CLIENT_CERTIFICATE in os.environ:
del os.environ[environment_vars.GOOGLE_API_USE_CLIENT_CERTIFICATE]
assert _mtls_helper.check_use_client_cert() is True

with mock.patch.dict(os.environ, {
environment_vars.CLOUDSDK_CONTEXT_AWARE_USE_CLIENT_CERTIFICATE: "false"
}):
if environment_vars.GOOGLE_API_USE_CLIENT_CERTIFICATE in os.environ:
del os.environ[environment_vars.GOOGLE_API_USE_CLIENT_CERTIFICATE]
assert _mtls_helper.check_use_client_cert() is False

def test_cert_config_path_precedence(self):
# GOOGLE_API_CERTIFICATE_CONFIG takes precedence
google_path = "/path/to/google/config"
cloudsdk_path = "/path/to/cloudsdk/config"

with mock.patch.dict(os.environ, {
environment_vars.GOOGLE_API_CERTIFICATE_CONFIG: google_path,
environment_vars.CLOUDSDK_CONTEXT_AWARE_CERTIFICATE_CONFIG_FILE_PATH: cloudsdk_path
}):
with mock.patch("os.path.exists", return_value=True):
assert _mtls_helper._get_cert_config_path() == google_path

def test_cert_config_path_fallback(self):
# Fallback to CLOUDSDK_CONTEXT_AWARE_CERTIFICATE_CONFIG_FILE_PATH if GOOGLE_API_CERTIFICATE_CONFIG is unset
cloudsdk_path = "/path/to/cloudsdk/config"

with mock.patch.dict(os.environ, {
environment_vars.CLOUDSDK_CONTEXT_AWARE_CERTIFICATE_CONFIG_FILE_PATH: cloudsdk_path
}):
if environment_vars.GOOGLE_API_CERTIFICATE_CONFIG in os.environ:
del os.environ[environment_vars.GOOGLE_API_CERTIFICATE_CONFIG]

with mock.patch("os.path.exists", return_value=True):
assert _mtls_helper._get_cert_config_path() == cloudsdk_path

@mock.patch("builtins.open", autospec=True)
def test_check_use_client_cert_config_fallback(self, mock_file):
# Test fallback for config file when determining if client cert should be used
cloudsdk_path = "/path/to/cloudsdk/config"

mock_file.side_effect = mock.mock_open(
read_data='{"cert_configs": {"workload": "exists"}}'
)

with mock.patch.dict(os.environ, {
environment_vars.CLOUDSDK_CONTEXT_AWARE_CERTIFICATE_CONFIG_FILE_PATH: cloudsdk_path
}):
if environment_vars.GOOGLE_API_CERTIFICATE_CONFIG in os.environ:
del os.environ[environment_vars.GOOGLE_API_CERTIFICATE_CONFIG]
if environment_vars.GOOGLE_API_USE_CLIENT_CERTIFICATE in os.environ:
del os.environ[environment_vars.GOOGLE_API_USE_CLIENT_CERTIFICATE]
if environment_vars.CLOUDSDK_CONTEXT_AWARE_USE_CLIENT_CERTIFICATE in os.environ:
del os.environ[environment_vars.CLOUDSDK_CONTEXT_AWARE_USE_CLIENT_CERTIFICATE]

assert _mtls_helper.check_use_client_cert() is True
Loading