Skip to content

Authentication

Classes for authenticating with the Anaplan API, including Basic and Certificate (mTLS) methods.

anaplan_orm.authenticator

Authenticator

Bases: ABC

Abstract interface for Anaplan Authentication strategies. Handles token caching and lifecycle management automatically.

Source code in src/anaplan_orm/authenticator.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
class Authenticator(ABC):
    """
    Abstract interface for Anaplan Authentication strategies.
    Handles token caching and lifecycle management automatically.
    """

    AUTH_URL = "https://auth.anaplan.com/token/authenticate"
    DEFAULT_TOKEN_REFRESH_TIME: int = 1800

    def __init__(self):
        self._cached_token: str | None = None
        self._token_timestamp: float = 0.0

    def _requires_new_token(self) -> bool:
        """Returns True if a new token is needed, False otherwise."""
        return (self._cached_token is None) or (
            time.time() - self._token_timestamp > self.DEFAULT_TOKEN_REFRESH_TIME
        )

    def clear_token(self) -> None:
        """Wipes the cached token, forcing a fresh handshake on the next request."""
        self._cached_token = None

    def get_auth_headers(self) -> dict:
        """Returns the authorization headers, fetching a new token only if necessary."""
        if self._requires_new_token():
            self.authenticate()

        return {
            "Authorization": f"AnaplanAuthToken {self._cached_token}",
            "Content-Type": "application/json",
        }

    @abstractmethod
    def authenticate(self) -> None:
        """
        Implementation-specific logic to fetch a token from Anaplan
        and update self._cached_token and self._token_timestamp.
        """
        pass

authenticate() abstractmethod

Implementation-specific logic to fetch a token from Anaplan and update self._cached_token and self._token_timestamp.

Source code in src/anaplan_orm/authenticator.py
47
48
49
50
51
52
53
@abstractmethod
def authenticate(self) -> None:
    """
    Implementation-specific logic to fetch a token from Anaplan
    and update self._cached_token and self._token_timestamp.
    """
    pass

clear_token()

Wipes the cached token, forcing a fresh handshake on the next request.

Source code in src/anaplan_orm/authenticator.py
33
34
35
def clear_token(self) -> None:
    """Wipes the cached token, forcing a fresh handshake on the next request."""
    self._cached_token = None

get_auth_headers()

Returns the authorization headers, fetching a new token only if necessary.

Source code in src/anaplan_orm/authenticator.py
37
38
39
40
41
42
43
44
45
def get_auth_headers(self) -> dict:
    """Returns the authorization headers, fetching a new token only if necessary."""
    if self._requires_new_token():
        self.authenticate()

    return {
        "Authorization": f"AnaplanAuthToken {self._cached_token}",
        "Content-Type": "application/json",
    }

BasicAuthenticator

Bases: Authenticator

Source code in src/anaplan_orm/authenticator.py
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
class BasicAuthenticator(Authenticator):
    def __init__(self, email: str, pwd: str, verify_ssl: bool = True):
        super().__init__()  # Initialize the base class token caching
        self.email = email
        self.pwd = pwd
        self.verify_ssl = verify_ssl

    @retry_network_errors()
    def authenticate(self) -> None:
        """Fetches a new token using Email and Password."""
        try:
            response = httpx.post(
                self.AUTH_URL, auth=(self.email, self.pwd), verify=self.verify_ssl
            )
            response.raise_for_status()
            json_payload = response.json()

            if json_payload.get("status") != "SUCCESS":
                err_msg = json_payload.get("statusMessage", "Unknown Error")
                raise AnaplanConnectionError(f"Anaplan Auth Failed: {err_msg}")

            self._cached_token = json_payload["tokenInfo"]["tokenValue"]
            self._token_timestamp = time.time()

        except httpx.HTTPError as e:
            raise AnaplanConnectionError(f"Basic Authentication failed: {str(e)}") from e

authenticate()

Fetches a new token using Email and Password.

Source code in src/anaplan_orm/authenticator.py
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
@retry_network_errors()
def authenticate(self) -> None:
    """Fetches a new token using Email and Password."""
    try:
        response = httpx.post(
            self.AUTH_URL, auth=(self.email, self.pwd), verify=self.verify_ssl
        )
        response.raise_for_status()
        json_payload = response.json()

        if json_payload.get("status") != "SUCCESS":
            err_msg = json_payload.get("statusMessage", "Unknown Error")
            raise AnaplanConnectionError(f"Anaplan Auth Failed: {err_msg}")

        self._cached_token = json_payload["tokenInfo"]["tokenValue"]
        self._token_timestamp = time.time()

    except httpx.HTTPError as e:
        raise AnaplanConnectionError(f"Basic Authentication failed: {str(e)}") from e

CertificateAuthenticator

Bases: Authenticator

Source code in src/anaplan_orm/authenticator.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
class CertificateAuthenticator(Authenticator):
    def __init__(self, cert_path: str, cert_password: str = None, verify_ssl: bool = True):
        super().__init__()
        self.cert_path = cert_path
        self.cert_password = cert_password
        self.verify_ssl = verify_ssl

    @retry_network_errors()
    def authenticate(self) -> None:
        """Fetches a new token using Anaplan's custom RSA-SHA512 handshake."""
        try:
            # 1. Read the raw PEM file
            with open(self.cert_path, "rb") as f:
                pem_data = f.read()

            # 2. Extract Public Certificate string (Anaplan requires it without headers or newlines)
            pem_text = pem_data.decode("utf-8")
            if "-----BEGIN CERTIFICATE-----" not in pem_text:
                raise ValueError("No public certificate found in the PEM file.")

            cert_body = pem_text.split("-----BEGIN CERTIFICATE-----")[1].split(
                "-----END CERTIFICATE-----"
            )[0]
            pub_cert_string = cert_body.replace("\n", "").replace("\r", "")

            # 3. Load the Private Key to sign the payload
            pwd_bytes = self.cert_password.encode("utf-8") if self.cert_password else None
            private_key = serialization.load_pem_private_key(pem_data, password=pwd_bytes)

            # 4. Generate a 100-byte random message and encode it
            random_bytes = os.urandom(100)
            encoded_data = base64.b64encode(random_bytes).decode("utf-8")

            # 5. Sign the message using RSA-SHA512
            signature = private_key.sign(random_bytes, padding.PKCS1v15(), hashes.SHA512())
            encoded_signed_data = base64.b64encode(signature).decode("utf-8")

            # 6. Build Anaplan's highly specific JSON request
            headers = {
                "Authorization": f"CACertificate {pub_cert_string}",
                "Content-Type": "application/json",
            }
            payload = {"encodedData": encoded_data, "encodedSignedData": encoded_signed_data}

            # Send a standard POST request
            response = httpx.post(
                self.AUTH_URL, headers=headers, json=payload, verify=self.verify_ssl
            )
            response.raise_for_status()
            json_payload = response.json()

            if json_payload.get("status") != "SUCCESS":
                err_msg = json_payload.get("statusMessage", "Unknown Error")
                raise AnaplanConnectionError(f"Anaplan Auth Failed: {err_msg}")

            self._cached_token = json_payload["tokenInfo"]["tokenValue"]
            self._token_timestamp = time.time()

        except Exception as e:
            raise AnaplanConnectionError(f"Certificate Authentication failed: {str(e)}") from e

authenticate()

Fetches a new token using Anaplan's custom RSA-SHA512 handshake.

Source code in src/anaplan_orm/authenticator.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
@retry_network_errors()
def authenticate(self) -> None:
    """Fetches a new token using Anaplan's custom RSA-SHA512 handshake."""
    try:
        # 1. Read the raw PEM file
        with open(self.cert_path, "rb") as f:
            pem_data = f.read()

        # 2. Extract Public Certificate string (Anaplan requires it without headers or newlines)
        pem_text = pem_data.decode("utf-8")
        if "-----BEGIN CERTIFICATE-----" not in pem_text:
            raise ValueError("No public certificate found in the PEM file.")

        cert_body = pem_text.split("-----BEGIN CERTIFICATE-----")[1].split(
            "-----END CERTIFICATE-----"
        )[0]
        pub_cert_string = cert_body.replace("\n", "").replace("\r", "")

        # 3. Load the Private Key to sign the payload
        pwd_bytes = self.cert_password.encode("utf-8") if self.cert_password else None
        private_key = serialization.load_pem_private_key(pem_data, password=pwd_bytes)

        # 4. Generate a 100-byte random message and encode it
        random_bytes = os.urandom(100)
        encoded_data = base64.b64encode(random_bytes).decode("utf-8")

        # 5. Sign the message using RSA-SHA512
        signature = private_key.sign(random_bytes, padding.PKCS1v15(), hashes.SHA512())
        encoded_signed_data = base64.b64encode(signature).decode("utf-8")

        # 6. Build Anaplan's highly specific JSON request
        headers = {
            "Authorization": f"CACertificate {pub_cert_string}",
            "Content-Type": "application/json",
        }
        payload = {"encodedData": encoded_data, "encodedSignedData": encoded_signed_data}

        # Send a standard POST request
        response = httpx.post(
            self.AUTH_URL, headers=headers, json=payload, verify=self.verify_ssl
        )
        response.raise_for_status()
        json_payload = response.json()

        if json_payload.get("status") != "SUCCESS":
            err_msg = json_payload.get("statusMessage", "Unknown Error")
            raise AnaplanConnectionError(f"Anaplan Auth Failed: {err_msg}")

        self._cached_token = json_payload["tokenInfo"]["tokenValue"]
        self._token_timestamp = time.time()

    except Exception as e:
        raise AnaplanConnectionError(f"Certificate Authentication failed: {str(e)}") from e