diff --git a/google/auth/_cloud_sdk.py b/google/auth/_cloud_sdk.py index 85b3c4f99..b1f0ed789 100644 --- a/google/auth/_cloud_sdk.py +++ b/google/auth/_cloud_sdk.py @@ -36,6 +36,12 @@ _CLOUD_SDK_CONFIG_GET_PROJECT_COMMAND = ("config", "get", "project") # The command to get google user access token _CLOUD_SDK_USER_ACCESS_TOKEN_COMMAND = ("auth", "print-access-token") +# The command to check if context aware client certificate is enabled +_CLOUD_SDK_CONTEXT_AWARE_USE_CLIENT_CERTIFICATE_COMMAND = ( + "config", + "get", + "context_aware/use_client_certificate", +) # Cloud SDK's application-default client ID CLOUD_SDK_CLIENT_ID = ( "764086051850-6qr4p6gpi6hn506pt8ejuq83di341hur.apps.googleusercontent.com" @@ -151,3 +157,27 @@ def get_auth_access_token(account=None): "Failed to obtain access token", caught_exc ) raise new_exc from caught_exc + + +def get_context_aware_use_client_certificate(): + """Checks if the context aware use_client_certificate is set to true in the Cloud SDK. + + Returns: + bool: True if the context aware use_client_certificate is set to true, False otherwise. + """ + if os.name == "nt": + command = _CLOUD_SDK_WINDOWS_COMMAND + else: + command = _CLOUD_SDK_POSIX_COMMAND + + try: + # Ignore the stderr coming from gcloud, so it won't be mixed into the output. + use_client_cert = _run_subprocess_ignore_stderr( + (command,) + _CLOUD_SDK_CONTEXT_AWARE_USE_CLIENT_CERTIFICATE_COMMAND + ) + + # Turn bytes into a string and remove "\n" + use_client_cert = _helpers.from_bytes(use_client_cert).strip() + return use_client_cert.lower() == "true" + except (subprocess.CalledProcessError, OSError, IOError): + return False diff --git a/google/auth/transport/_mtls_helper.py b/google/auth/transport/_mtls_helper.py index 5b56b60b1..b7bfd0850 100644 --- a/google/auth/transport/_mtls_helper.py +++ b/google/auth/transport/_mtls_helper.py @@ -21,6 +21,7 @@ import subprocess from google.auth import _agent_identity_utils +from google.auth import _cloud_sdk from google.auth import environment_vars from google.auth import exceptions @@ -474,6 +475,11 @@ def check_use_client_cert(): json.JSONDecodeError, ) as e: _LOGGER.debug("error decoding certificate: %s", e) + + # Check if context aware use_client_certificate is set to true in gcloud config. + if _cloud_sdk.get_context_aware_use_client_certificate(): + return True + return False diff --git a/tests/test__cloud_sdk.py b/tests/test__cloud_sdk.py index dd14bcebe..c3be12c43 100644 --- a/tests/test__cloud_sdk.py +++ b/tests/test__cloud_sdk.py @@ -178,3 +178,48 @@ def test_get_auth_access_token_with_exception(check_output): with pytest.raises(exceptions.UserAccessTokenError): _cloud_sdk.get_auth_access_token(account="account") + + +@pytest.mark.parametrize( + "data, expected_result", + [(b"true\n", True), (b"True\n", True), (b"false\n", False), (b"other\n", False)], +) +def test_get_context_aware_use_client_certificate(data, expected_result): + check_output_patch = mock.patch( + "subprocess.check_output", autospec=True, return_value=data + ) + + with check_output_patch as check_output: + result = _cloud_sdk.get_context_aware_use_client_certificate() + + assert result == expected_result + assert check_output.called + + +@mock.patch( + "subprocess.check_output", + autospec=True, + side_effect=subprocess.CalledProcessError(-1, "testing"), +) +def test_get_context_aware_use_client_certificate_error(check_output): + result = _cloud_sdk.get_context_aware_use_client_certificate() + assert result is False + assert check_output.called + + +@mock.patch("os.name", new="nt") +def test_get_context_aware_use_client_certificate_windows(): + check_output_patch = mock.patch( + "subprocess.check_output", autospec=True, return_value=b"true\n" + ) + + with check_output_patch as check_output: + result = _cloud_sdk.get_context_aware_use_client_certificate() + + assert result is True + assert check_output.called + # Make sure the executable is `gcloud.cmd`. + args = check_output.call_args[0] + command = args[0] + executable = command[0] + assert executable == "gcloud.cmd" diff --git a/tests/transport/test__mtls_helper.py b/tests/transport/test__mtls_helper.py index 1cdd91739..ed12a9647 100644 --- a/tests/transport/test__mtls_helper.py +++ b/tests/transport/test__mtls_helper.py @@ -781,7 +781,11 @@ def test_config_file_success(self, mock_file): ) def test_config_file_missing_keys(self, mock_file): mock_file.side_effect = mock.mock_open(read_data='{"cert_configs": {}}') - assert _mtls_helper.check_use_client_cert() is False + with mock.patch( + "google.auth._cloud_sdk.get_context_aware_use_client_certificate" + ) as mock_gcloud_check: + mock_gcloud_check.return_value = False + assert _mtls_helper.check_use_client_cert() is False @mock.patch("builtins.open", autospec=True) @mock.patch.dict( @@ -793,7 +797,11 @@ def test_config_file_missing_keys(self, mock_file): ) def test_config_file_bad_json(self, mock_file): mock_file.side_effect = mock.mock_open(read_data="{bad_json") - assert _mtls_helper.check_use_client_cert() is False + with mock.patch( + "google.auth._cloud_sdk.get_context_aware_use_client_certificate" + ) as mock_gcloud_check: + mock_gcloud_check.return_value = False + assert _mtls_helper.check_use_client_cert() is False @mock.patch("builtins.open", autospec=True) @mock.patch.dict( @@ -805,11 +813,27 @@ def test_config_file_bad_json(self, mock_file): ) def test_config_file_not_found(self, mock_file): mock_file.side_effect = FileNotFoundError - assert _mtls_helper.check_use_client_cert() is False + with mock.patch( + "google.auth._cloud_sdk.get_context_aware_use_client_certificate" + ) as mock_gcloud_check: + mock_gcloud_check.return_value = False + assert _mtls_helper.check_use_client_cert() is False @mock.patch.dict(os.environ, {}, clear=True) def test_no_env_vars_set(self): - assert _mtls_helper.check_use_client_cert() is False + with mock.patch( + "google.auth._cloud_sdk.get_context_aware_use_client_certificate" + ) as mock_gcloud_check: + mock_gcloud_check.return_value = False + assert _mtls_helper.check_use_client_cert() is False + + @mock.patch.dict(os.environ, {}, clear=True) + def test_gcloud_config_true(self): + with mock.patch( + "google.auth._cloud_sdk.get_context_aware_use_client_certificate" + ) as mock_gcloud_check: + mock_gcloud_check.return_value = True + assert _mtls_helper.check_use_client_cert() is True class TestMtlsHelper: