Validate the Access token with FastAPI
This tutorial covers how to validate that the Access JWT is on requests made to FastAPI apps.
Time to complete: 15 minutes
Prerequisites
- A self-hosted Access application for your FastAPI app
- The AUD tag for your Access application
1. Create a validation function
- In your FastAPI project, create a new file called
cloudflare.py
that contains the following code:
cloudflare.pyfrom fastapi import Request, HTTPException
# The Application Audience (AUD) tag for your applicationPOLICY_AUD = "XXXXX"
# Your CF Access team domainTEAM_DOMAIN = "https://<your-team-name>.cloudflareaccess.com"CERTS_URL = "{}/cdn-cgi/access/certs".format(TEAM_DOMAIN)
async def validate_cloudflare(request: Request): """ Validate that the request is authenticated by Cloudflare Access. """ if verify_token(request) != True: raise HTTPException(status_code=400, detail="Not authenticated properly!")
def _get_public_keys(): """ Returns: List of RSA public keys usable by PyJWT. """ r = requests.get(CERTS_URL) public_keys = [] jwk_set = r.json() for key_dict in jwk_set["keys"]: public_key = jwt.algorithms.RSAAlgorithm.from_jwk(json.dumps(key_dict)) public_keys.append(public_key) return public_keys
def verify_token(request): """ Verify the token in the request. """ token = ""
if "CF_Authorization" in request.cookies: token = request.cookies["CF_Authorization"] else: raise HTTPException(status_code=400, detail="missing required cf authorization token")
keys = _get_public_keys()
# Loop through the keys since we can't pass the key set to the decoder valid_token = False for key in keys: try: # decode returns the claims that has the email when needed jwt.decode(token, key=key, audience=POLICY_AUD, algorithms=["RS256"]) valid_token = True break except: raise HTTPException(status_code=400, detail="Error decoding token") if not valid_token: raise HTTPException(status_code=400, detail="Invalid token")
return True
2. Use the validation function in your app
You can now add the validation function as a dependency in your FastAPI app. One way to do this is by creating an
APIRouter
instance. The following example executes the validation function on each request made to paths that start with /admin
:
from fastapi import APIRouter, Depends, HTTPExceptionfrom cloudflare import validate_cloudflarerouter = APIRouter(prefix="/admin",tags=["admin"],dependencies=[Depends(validate_cloudflare)]responses={404: {"description": "Not found"}},)@router.get("/")async def root():return {"message": "Hello World"}