This lab will cover a number of advanced features you can add to your fastapi applications. It will use the starter repository as a base.

We can add global error handlers to our fastapi application. Error handlers are fallback functions that would be executed when a runtime error occurs in a route.

A good example of this is in the FastAPI Starter Template lines 28-33

@app.exception_handler(status.HTTP_401_UNAUTHORIZED)
async def unauthorized_redirect_handler(request: Request, exc: Exception):
    return templates.TemplateResponse(
        request=request, 
        name="401.html",
    )

This error handler is executed when a request is made to a url and we raise an error with a status code 401. This error handler would listen for such errors and return a page that tells the user that their session has expired

Task 2.1

Edit the fastapi starter template and add the following to main.py under the existing error function

@app.exception_handler(status.HTTP_404_NOT_FOUND)
async def not_found_handler(request: Request, exc: Exception):
    return templates.TemplateResponse(
        request=request, 
        name="404.html",
    )

Task 2.2

Create a file in the templates folder called 404.html and populate it with content like below

{% extends "base.html" %}
{% block title %}Not Found{% endblock %}


{% block internal_css %}
<style>
    body {
        background-color: #f8f9fa;
    }

    .page-container {
        min-height: 100vh;
    }

    .card-custom {
        border: none;
        border-radius: 1rem;
        overflow: hidden;
    }

    .left-section {
        background: linear-gradient(135deg, #0d6efd, #0dcaf0);
        display: flex;
        align-items: center;
        justify-content: center;
        color: white;
        padding: 3rem;
    }

    .left-section img {
        max-width: 100%;
        height: auto;
    }

    .right-section {
        padding: 3rem;
    }

    .btn-primary {
        border-radius: 0.5rem;
    }
</style>
{% endblock %}


{% block content %}

<div class="container page-container d-flex align-items-center justify-content-center">
    <div class="card shadow-lg card-custom col-lg-10">
        <div class="row g-0">

            <div class="col-lg-6 d-none d-lg-flex left-section">
                <div class="text-center">
                    <h2 class="mt-4 fw-bold">Not Found</h2>
                </div>
            </div>

            <div class="col-lg-6 bg-white">
                <div class="right-section d-flex flex-column justify-content-center h-100">
                    <h1 class="display-5 fw-bold text-danger mb-3">404</h1>
                    <h3 class="fw-semibold mb-3">Needle in a haystack</h3>
                    <p class="text-muted mb-4">
                        We just can't seem to find it
                    </p>

                    <div class="d-grid">
                        <a href="{{url_for('index_view')}}" class="btn btn-primary btn-lg">
                            Return to Home
                        </a>
                    </div>
                </div>
            </div>

        </div>
    </div>
</div>

{% endblock %}

Web Sockets is a web protocol that allows applications to have realtime two-way communication. It is typically used in chatting applications. websockets is a popular web socket library for adding real time connections to web applications.

To test out websockets, perform the following

  1. Add the websockets package as a dependency to pyproject.toml inside the dependencies list
... redacted for brevity ...
dependencies = [
    "asyncpg",
    "fastapi[all]",
    "httpx",
    "itsdangerous",
    "jinja2",
    "pwdlib[argon2]",
    "psycopg2-binary",
    "pyjwt",
    "python-jose",
    "pytest-asyncio",
    "pytest",
    "ruff",
    "sqlmodel",
    "tabulate",
    "typer",
    "uvicorn[standard]",
    "websockets", <-- ADD THIS (without this comment)
]

... redacted for brevity ...

  1. Reinstall the dependencies pip install -e .

Adding websocket code

Let's create a quick chat app that allows a user to add a chatroom and send messages in it.

Add a new file services/websocket_service.py with the following content

from fastapi import WebSocket

class WebSocketService:
    def __init__(self):
        self.active_connections: list[WebSocket] = []

    async def connect(self, websocket: WebSocket):
        await websocket.accept()
        self.active_connections.append(websocket)

    def disconnect(self, websocket: WebSocket):
        self.active_connections.remove(websocket)

    async def send_personal_message(self, message: str, websocket: WebSocket):
        await websocket.send_text(message)

    async def broadcast(self, message: str):
        for connection in self.active_connections:
            await connection.send_text(message)

websocket_service = WebSocketService()

Create a new file chat.html in the templates folder with the following content

{% extends "authenticated-base.html" %}

{% block external_js %}
<script>
    var client_id = "lab-10-chat-room"
    document.querySelector("#ws-id").textContent = client_id;
    var ws = new WebSocket(`/ws/${client_id}`);
    ws.onmessage = function(event) {
        var messages = document.getElementById('messages')
        var message = document.createElement('li')
        var content = document.createTextNode(event.data)
        message.appendChild(content)
        messages.appendChild(message)
    };
    function sendMessage(event) {
        var input = document.getElementById("messageText")
        ws.send(input.value)
        input.value = ''
        event.preventDefault()
    }
</script>
{% endblock %}


{% block subpage_content %}

    <div class="container" id="content">
        <h1>WebSocket Chat</h1>
        <h2>Your ID: <span id="ws-id"></span></h2>
        <form action="" onsubmit="sendMessage(event)">
            <input type="text" id="messageText" autocomplete="off"/>
            <button>Send</button>
        </form>
        <ul id='messages'>
        </ul>
    </div>

{% endblock %}

Next, create a new router websocket.py in the routers folder.

from fastapi.responses import HTMLResponse, RedirectResponse
from fastapi import Request, status, Form, WebSocket, WebSocketDisconnect
from app.dependencies import SessionDep, AuthDep
from . import router, templates
from app.services.user_service import UserService
from app.repositories.user import UserRepository
from app.utilities.flash import flash
from app.schemas import UserResponse
from app.services.websocket_service import websocket_service


@router.get("/chats", response_class=HTMLResponse)
async def chats_view(
    request: Request,
    user: AuthDep,
    db:SessionDep
):
    return templates.TemplateResponse(
        request=request, 
        name="chat.html",
        context={
            "user": user
        }
    )


@router.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: str):
    await websocket_service.connect(websocket)
    try:
        while True:
            data = await websocket.receive_text()
            await websocket_service.send_personal_message(f"You wrote: {data}", websocket)
            await websocket_service.broadcast(f"Client #{client_id} says: {data}")
    except WebSocketDisconnect:
        websocket_service.disconnect(websocket)
        await websocket_service.broadcast(f"Client #{client_id} left the chat")

Finally, include the new file in the imports list in routers/__init__.py by replacing the last line with the following

from . import (index, login, register, admin_home, user_home, users, logout, websocket)

Now, when you're logged in to the app, feel free to go to the /chats endpoint and type messages.

Sign up with another user and go to the /chats endpoint and send messages using the realtime chat

Uploading files is a very common feature in web applications. A file can be submitted in a HTML 5 form, received by the server and stored on the server's filesystem. However this can introduce potential security issues.

Additionally, files should be managed to avoid name clashes and metadata about the files should be stored in the database so that they can be retrieved.

In order to render a table of uploaded files we need an upload model to keep track of the files uploaded. Additionally the files are renamed to a random filename to avoid name clashes.

Firstly we use a multipart form to send our file data to the server

To test out websockets, perform the following

  1. Add the python-multipart package as a dependency to pyproject.toml inside the dependencies list
... redacted for brevity ...
dependencies = [
    "asyncpg",
    "fastapi[all]",
    "httpx",
    "itsdangerous",
    "jinja2",
    "pwdlib[argon2]",
    "psycopg2-binary",
    "pyjwt",
    "python-jose",
    "python-multipart", <-- ADD THIS (without this comment)
    "pytest-asyncio",
    "pytest",
    "ruff",
    "sqlmodel",
    "tabulate",
    "typer",
    "uvicorn[standard]",
    "websockets", 
]

... redacted for brevity ...
  1. Reinstall the dependencies pip install -e .

Adding File Upload code

Let's create the files needed to facilitate the uploading of content Add a new file services/upload_service.py with the following content

import os, string, random
import shutil

class UploadService:
    def random_string(self, ):
        return ''.join(random.choices(string.ascii_uppercase + string.digits, k=10))

    async def store_file(self, file):
        os.makedirs('uploads', exist_ok=True)
        extension = os.path.splitext(file.filename)[1]
        newname = self.random_string() + extension
        with open(os.path.join('uploads', newname), "wb") as buffer:
            shutil.copyfileobj(file.file, buffer)
        return newname


    def remove_file(self, filename):
        try:
            os.remove(os.path.join('app/uploads', filename))
        except:
            print('file already Deleted')

Add a new model models/upload.py with the following content

from sqlmodel import Field, SQLModel
from typing import Optional

class Upload(SQLModel, table=True):
    id: Optional[int] = Field(default=None, primary_key=True)
    filename: str = Field(index=True, unique=True)
    original_filename: str = Field(index=True)

Add the endpoint for accepting an upload in routers/upload.py

from fastapi.responses import HTMLResponse, RedirectResponse
from fastapi import Request, status, Form, WebSocket, WebSocketDisconnect, File, UploadFile
from app.dependencies import SessionDep, AuthDep
from . import router, templates
from app.services.user_service import UserService
from app.repositories.user import UserRepository
from app.utilities.flash import flash
from app.schemas import UserResponse
from app.services.upload_service import UploadService


@router.post("/upload")
async def create_upload_file(
    files: list[UploadFile],
    user: AuthDep,
    db: SessionDep,
):
    upload_service = UploadService()
    for file in files:
        await upload_service.store_file(file)
    return RedirectResponse(url="/", status_code=status.HTTP_302_FOUND)


@router.get("/upload", response_class=HTMLResponse)
async def upload_view(
    request: Request,
    user: AuthDep,
    db:SessionDep
):
    return templates.TemplateResponse(
        request=request, 
        name="upload.html",
        context={
            "user": user
        }
    )

Create the HTML upload.html that has the HTML form for uploading content

{% extends "authenticated-base.html" %}

{% block external_js %}

{% endblock %}


{% block subpage_content %}

<form action="/upload" enctype="multipart/form-data" method="post">
<input name="files" type="file" multiple>
<input type="submit">
</form>

{% endblock %}


Update the imports in routers/__init__.py

from . import (index, login, register, admin_home, user_home, users, logout, websocket, upload)

Task 4.1

Update the code to ensure that after the file is saved, the record is stored appropriately in the database.

Task 4.2

Implement a way to delete an upload if it exists

Task 4.3

Implement a way for a user to view an uploaded image on the website if it exists.

While the previous approach for uploads works it is by no means scalable because uploads are stored in the same location as the application code. Additionally, this would NOT work for applications deployed to services like heroku or render which cleans all app state when it goes to sleep.

Dedicated storage services should be utilized in this use case as the uploads would be stored separately from application code. Hence we send the file to firebase storage for safe keeping instead.

Keeping this here for reference purposes

Follow the steps below

  1. Go to the firebase console
  2. Create a new firebase project (accept the defaults)
  3. After the project is created, select the Service Accounts tab
  4. Select python and view the snippet on how to integrate firebase
  5. Generate a new private key and copy this file into the root of your code.
  1. rename the file to firebase.json or something similar

Integrating firebase storage

  1. Modify the dependencies and add firebase-admin (like you did in the previous sections)
  2. Reinstall the dependencies pip install -e .
  3. Create a new service services/firebase_service.py
  4. Enable Firebase Storage by expanding the left menu on the firebase console -> Databases and storage -> Storage
  5. Enable Storage (may require credit card)
  6. Keep note of the storage URL gs://identifier.firebasestorage.app

Modify the code below to suit your path to where you stored the file

import firebase_admin
from firebase_admin import credentials, storage

cred = credentials.Certificate("firebase.json")
firebase_admin.initialize_app(cred, {
    'storageBucket': '<identifier>.firebasestorage.app'
})
bucket = storage.bucket()

Modify upload service to store in firebase

Replace the upload_service as follows

import os, string, random
from .firebase_service import bucket

class UploadService:
    def random_string(self,):
        return ''.join(random.choices(string.ascii_uppercase + string.digits, k=10))

    async def store_file(self, file):
        extension = os.path.splitext(file.filename)[1]
        newname = self.random_string() + extension
        blob = bucket.blob(newname)
        blob.upload_from_file(file.file)
        return newname

    def remove_file(self, filename):
        try:
            blob = bucket.blob(filename)
            blob.delete()
        except Exception:
            print('file already Deleted from Firebase')

Reload your firebase storage and see the stored file

Push notifications are also supported on the web. Using the firebase admin sdk and Firebase Cloud Messaging we can broadcast notifications to users.

This requires you to download a Google Service Account for your project so your web app can perform admin operations such as sending notifications.

You can try out the snippets of code in the previous section to intialize the firebase service and check out the firebase docs on how to send push notifications to a device. This would also involve integrating the firebase javavascript SDK on the client-side. The client SDK will allow you to generate an identifier for a client that you can use in the Admin SDK for sending messages to the client.

Hopefully this lab provides some useful resources to aid in your project implementation.