Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions msal/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
SystemAssignedManagedIdentity, UserAssignedManagedIdentity,
ManagedIdentityClient,
ManagedIdentityError,
MsiV2Error,
ArcPlatformNotSupportedError,
)

Expand Down
47 changes: 46 additions & 1 deletion msal/managed_identity.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ class ManagedIdentityError(ValueError):
pass


class MsiV2Error(ManagedIdentityError):
"""Raised when the MSI v2 (mTLS PoP) flow fails."""
pass


class ManagedIdentity(UserDict):
"""Feed an instance of this class to :class:`msal.ManagedIdentityClient`
to acquire token for the specified managed identity.
Expand Down Expand Up @@ -259,6 +264,8 @@ def acquire_token_for_client(
*,
resource: str, # If/when we support scope, resource will become optional
claims_challenge: Optional[str] = None,
mtls_proof_of_possession: bool = False,
with_attestation_support: bool = False,
):
"""Acquire token for the managed identity.

Expand All @@ -278,6 +285,23 @@ def acquire_token_for_client(
even if the app developer did not opt in for the "CP1" client capability.
Upon receiving a `claims_challenge`, MSAL will attempt to acquire a new token.

:param bool mtls_proof_of_possession: (optional)
When True, use the MSI v2 (mTLS Proof-of-Possession) flow to acquire an
``mtls_pop`` token bound to a short-lived mTLS certificate issued by the
IMDS ``/issuecredential`` endpoint.
Without this flag the legacy IMDS v1 flow is used.
Defaults to False.

MSI v2 is used only when both ``mtls_proof_of_possession`` and
``with_attestation_support`` are True.

:param bool with_attestation_support: (optional)
When True (and ``mtls_proof_of_possession`` is also True), attempt
KeyGuard / platform attestation before credential issuance.
On Windows this leverages ``AttestationClientLib.dll`` when available;
on other platforms the parameter is silently ignored.
Defaults to False.

.. note::

Known issue: When an Azure VM has only one user-assigned managed identity,
Expand All @@ -292,6 +316,27 @@ def acquire_token_for_client(
client_id_in_cache = self._managed_identity.get(
ManagedIdentity.ID, "SYSTEM_ASSIGNED_MANAGED_IDENTITY")
now = time.time()
# MSI v2 is opt-in: use it only when BOTH mtls_proof_of_possession and
# with_attestation_support are explicitly requested by the caller.
# No auto-fallback: if MSI v2 is requested and fails, the error is raised.
use_msi_v2 = bool(mtls_proof_of_possession and with_attestation_support)

if with_attestation_support and not mtls_proof_of_possession:
raise ManagedIdentityError(
"attestation_requires_pop",
"with_attestation_support=True requires mtls_proof_of_possession=True (mTLS PoP)."
)

if use_msi_v2:
from .msi_v2 import obtain_token as _obtain_token_v2
result = _obtain_token_v2(
self._http_client, self._managed_identity, resource,
attestation_enabled=True,
)
if "access_token" in result and "error" not in result:
result[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP
return result

if True: # Attempt cache search even if receiving claims_challenge,
# because we want to locate the existing token (if any) and refresh it
matches = self._token_cache.search(
Expand Down Expand Up @@ -685,4 +730,4 @@ def _obtain_token_on_arc(http_client, endpoint, resource):
return {
"error": "invalid_request",
"error_description": response.text,
}
}
Loading
Loading