Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions google/auth/_cloud_sdk.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@
_CLOUD_SDK_CONFIG_GET_PROJECT_COMMAND = ("config", "get", "project")
# The command to get google user access token
_CLOUD_SDK_USER_ACCESS_TOKEN_COMMAND = ("auth", "print-access-token")
# The command to check if context aware client certificate is enabled
_CLOUD_SDK_CONTEXT_AWARE_USE_CLIENT_CERTIFICATE_COMMAND = (
"config",
"get",
"context_aware/use_client_certificate",
)
# Cloud SDK's application-default client ID
CLOUD_SDK_CLIENT_ID = (
"764086051850-6qr4p6gpi6hn506pt8ejuq83di341hur.apps.googleusercontent.com"
Expand Down Expand Up @@ -151,3 +157,27 @@ def get_auth_access_token(account=None):
"Failed to obtain access token", caught_exc
)
raise new_exc from caught_exc


def get_context_aware_use_client_certificate():
"""Checks if the context aware use_client_certificate is set to true in the Cloud SDK.

Returns:
bool: True if the context aware use_client_certificate is set to true, False otherwise.
"""
if os.name == "nt":
command = _CLOUD_SDK_WINDOWS_COMMAND
else:
command = _CLOUD_SDK_POSIX_COMMAND

try:
# Ignore the stderr coming from gcloud, so it won't be mixed into the output.
use_client_cert = _run_subprocess_ignore_stderr(
(command,) + _CLOUD_SDK_CONTEXT_AWARE_USE_CLIENT_CERTIFICATE_COMMAND
)

# Turn bytes into a string and remove "\n"
use_client_cert = _helpers.from_bytes(use_client_cert).strip()
return use_client_cert.lower() == "true"
except (subprocess.CalledProcessError, OSError, IOError):
return False
6 changes: 6 additions & 0 deletions google/auth/transport/_mtls_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import subprocess

from google.auth import _agent_identity_utils
from google.auth import _cloud_sdk
from google.auth import environment_vars
from google.auth import exceptions

Expand Down Expand Up @@ -474,6 +475,11 @@ def check_use_client_cert():
json.JSONDecodeError,
) as e:
_LOGGER.debug("error decoding certificate: %s", e)

# Check if context aware use_client_certificate is set to true in gcloud config.
if _cloud_sdk.get_context_aware_use_client_certificate():
return True

return False


Expand Down
45 changes: 45 additions & 0 deletions tests/test__cloud_sdk.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,3 +178,48 @@ def test_get_auth_access_token_with_exception(check_output):

with pytest.raises(exceptions.UserAccessTokenError):
_cloud_sdk.get_auth_access_token(account="account")


@pytest.mark.parametrize(
"data, expected_result",
[(b"true\n", True), (b"True\n", True), (b"false\n", False), (b"other\n", False)],
)
def test_get_context_aware_use_client_certificate(data, expected_result):
check_output_patch = mock.patch(
"subprocess.check_output", autospec=True, return_value=data
)

with check_output_patch as check_output:
result = _cloud_sdk.get_context_aware_use_client_certificate()

assert result == expected_result
assert check_output.called


@mock.patch(
"subprocess.check_output",
autospec=True,
side_effect=subprocess.CalledProcessError(-1, "testing"),
)
def test_get_context_aware_use_client_certificate_error(check_output):
result = _cloud_sdk.get_context_aware_use_client_certificate()
assert result is False
assert check_output.called


@mock.patch("os.name", new="nt")
def test_get_context_aware_use_client_certificate_windows():
check_output_patch = mock.patch(
"subprocess.check_output", autospec=True, return_value=b"true\n"
)

with check_output_patch as check_output:
result = _cloud_sdk.get_context_aware_use_client_certificate()

assert result is True
assert check_output.called
# Make sure the executable is `gcloud.cmd`.
args = check_output.call_args[0]
command = args[0]
executable = command[0]
assert executable == "gcloud.cmd"
32 changes: 28 additions & 4 deletions tests/transport/test__mtls_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -781,7 +781,11 @@ def test_config_file_success(self, mock_file):
)
def test_config_file_missing_keys(self, mock_file):
mock_file.side_effect = mock.mock_open(read_data='{"cert_configs": {}}')
assert _mtls_helper.check_use_client_cert() is False
with mock.patch(
"google.auth._cloud_sdk.get_context_aware_use_client_certificate"
) as mock_gcloud_check:
mock_gcloud_check.return_value = False
assert _mtls_helper.check_use_client_cert() is False

@mock.patch("builtins.open", autospec=True)
@mock.patch.dict(
Expand All @@ -793,7 +797,11 @@ def test_config_file_missing_keys(self, mock_file):
)
def test_config_file_bad_json(self, mock_file):
mock_file.side_effect = mock.mock_open(read_data="{bad_json")
assert _mtls_helper.check_use_client_cert() is False
with mock.patch(
"google.auth._cloud_sdk.get_context_aware_use_client_certificate"
) as mock_gcloud_check:
mock_gcloud_check.return_value = False
assert _mtls_helper.check_use_client_cert() is False

@mock.patch("builtins.open", autospec=True)
@mock.patch.dict(
Expand All @@ -805,11 +813,27 @@ def test_config_file_bad_json(self, mock_file):
)
def test_config_file_not_found(self, mock_file):
mock_file.side_effect = FileNotFoundError
assert _mtls_helper.check_use_client_cert() is False
with mock.patch(
"google.auth._cloud_sdk.get_context_aware_use_client_certificate"
) as mock_gcloud_check:
mock_gcloud_check.return_value = False
assert _mtls_helper.check_use_client_cert() is False

@mock.patch.dict(os.environ, {}, clear=True)
def test_no_env_vars_set(self):
assert _mtls_helper.check_use_client_cert() is False
with mock.patch(
"google.auth._cloud_sdk.get_context_aware_use_client_certificate"
) as mock_gcloud_check:
mock_gcloud_check.return_value = False
assert _mtls_helper.check_use_client_cert() is False

@mock.patch.dict(os.environ, {}, clear=True)
def test_gcloud_config_true(self):
with mock.patch(
"google.auth._cloud_sdk.get_context_aware_use_client_certificate"
) as mock_gcloud_check:
mock_gcloud_check.return_value = True
assert _mtls_helper.check_use_client_cert() is True


class TestMtlsHelper:
Expand Down