WebSockets in FastAPI: From Basics to Scaling

WebSockets enable real-time, bidirectional communication, perfect for chat apps, live notifications, and gaming. FastAPI, a modern Python framework, makes it easy to implement WebSockets for scalable, real-time applications. This blog walks you through setting up WebSocket connections, handling messages, and managing client-server interactions in FastAPI, with practical examples and best practices.

  ·   15 min read

Websocket and Why You Should Care? #

Let’s talk about WebSockets. If HTTP is like sending letters (write, wait, receive, repeat), WebSockets are like a never-ending phone call. They allow real-time, bidirectional communication between a client and a server, meaning both can send and receive data instantly without waiting for requests.

Technically, WebSockets are a protocol that provides full-duplex communication over a single, long-lived connection. Unlike HTTP, which is stateless and requires a new request for every response, WebSockets keep the connection alive. This makes them perfect for:

  • Chat apps (because no one likes waiting for “…” to turn into a message).
  • Live notifications (because FOMO is real).
  • Real-time data sharing (like the app we’re building in this blog).

I’m building a real-time location-sharing app for my motorcycle riding group. One of the key features is sharing live locations (with consent, of course). No more lying to the admins when you say, “I’m about to reach,” while you’re still stuck in Silk Board traffic. (Pro tip: You should’ve started early.)

To make this happen, I’m using FastAPI (Python) for the backend because it’s fast, async, and just plain awesome. For handling moderate scale, I’m using Redis to keep things snappy with its in-memory data storage. And for large-scale scenarios (fingers crossed, I hope my app gets so popular that I’ll have this “good problem” to solve), I’ll bring in RabbitMQ for message brokering. It’s my way of future-proofing the app, just in case.

In this blog, we’ll go from “Hey, I connected two things!” to “OMG, I’m handling thousands of users without breaking a sweat!” So, grab a coffee (or a beer, no one’s judging), and let’s get started.

The Setup #

Before we dive into the code, let’s get our tools ready. We’ll need FastAPI for the backend, Redis for handling real-time data, and Pika for working with RabbitMQ later.

Step 1: Install FastAPI #

FastAPI is the backbone of our app. It’s fast, async, and ridiculously easy to use. Let’s install it along with uvicorn and websockets, an ASGI server to run our app:

pip install fastapi uvicorn websockets

Step 1: Install Redis #

Redis is perfect for real-time data handling. It’s fast, in-memory, and works like a charm for our use case. Let’s install the Redis Python client:

pip install redis

Pro tip: If you don’t have Redis installed on your system, just search for How to set up Redis on [your OS] (Linux, Mac, Windows, etc.). It’s straightforward, and there are plenty of guides out there!

Step 3: Install RabbitMQ and Pika for RabbitMQ #

For scaling to thousands of users, we’ll use RabbitMQ. To interact with it in Python, we’ll use Pika:

pip install pika

Pro tip: If you don’t have RabbitMQ installed on your system, just search for How to set up RabbitMQ on [your OS] (Linux, Mac, Windows, etc.). It’s straightforward, and there are plenty of guides out there!

Why This Setup? #

  • FastAPI: Because it’s fast, modern, and perfect for real-time apps.
  • Redis: For in-memory data storage and real-time updates.
  • RabbitMQ: To handle message brokering when we scale (because dreams do come true, right?).

Writing the Code: Let’s Get Real-Time #

Let’s start by writing some code! We’ll set up a basic FastAPI app, serve an HTML page on the root route, and run a WebSocket server.

Step 1: Create the FastAPI App #

# main.py

from fastapi import FastAPI, WebSocket
from fastapi.responses import HTMLResponse

app = FastAPI()

# Serve an HTML page on the root route
@app.get("/")
async def get():
    return HTMLResponse("""
    <html>
        <head>
            <title>Real-Time Data Sharing</title>
        </head>
        <body>
            <h1>Welcome to the Real-Time App!</h1>
            <div id="chat">
                <input type="text" id="message" placeholder="Type a message...">
                <button onclick="sendMessage()">Send</button>
            </div>
            <ul id="messages"></ul>

            <script>
                // Generate a random username (e.g., "AnonymousXYZ")
                const generateUsername = () => {
                    const randomPart = Math.random().toString(36).substring(2, 5).toUpperCase();
                    return `Anonymous${randomPart}`;
                };

                const username = generateUsername(); // Assign a random username
                const ws = new WebSocket("ws://localhost:8081/ws");

                // Function to send a message
                const sendMessage = () => {
                    const message = document.getElementById("message").value.trim();
                    if (message === "") return; // Prevent sending empty messages

                    ws.send(`${username}: ${message}`);
                    document.getElementById("message").value = ""; // Clear the input box

                };

                // Display incoming messages
                ws.onmessage = (event) => {
                    const messages = document.getElementById("messages");
                    const li = document.createElement("li");
                    li.textContent = event.data;
                    messages.appendChild(li);
                };
            </script>
        </body>
    </html>
""")
# Run the app
if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8080)
# web_socket.py

from fastapi import FastAPI, WebSocket
from fastapi.responses import HTMLResponse

app = FastAPI()

# WebSocket endpoint
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    print("INFO: WebSocket connection established")  # Log connection
    while True:
        data = await websocket.receive_text()
        print(f"INFO: Received: {data}")  # Log received message
        await websocket.send_text(f"{data}")  # Broadcast the message
        print(f"INFO: Sent: {data}")  # Log sent message

Step 2: Run the WebSocket Server #

Since WebSockets need to run indefinitely, we’ll run the WebSocket server on a different port (8081). This avoids potential conflicts and makes debugging easier.

Running the HTTP Server (Port 8080) #

The HTTP server will serve the HTML page at the root route. Open a terminal and run:

uvicorn main:app --host 0.0.0.0 --port 8080
Running the WebSocket Server (Port 8081) #

The WebSocket server will handle real-time communication. Open another terminal and run:

uvicorn web_socket:app --host 0.0.0.0 --port 8081
Why Different Ports? #

We’re running the HTTP server on port 8080 and the WebSocket server on port 8081 because:

  • Separation of Concerns: Keeps HTTP and WebSocket traffic isolated.
  • Easier Debugging: If something goes wrong, you know exactly where to look.
  • Scalability: Makes it easier to scale and load balance in the future.

And hey, if you’re curious about the pros and cons of running them on the same port, stay tuned for a future blog post (because who doesn’t love a good sequel?).

Step 3: Verify Everything is Working #

Let’s make sure everything is set up correctly.

  • Open http://localhost:8080 in your browser. You should see the chat interface.
  • Check the Network Tab in developer tools (F12). Look for a WebSocket connection to ws://localhost:8081/ws with status 101 Switching Protocols.
  • Type you message and submit, In the terminal running the WebSocket server, you should see logs like:
INFO: WebSocket connection established
INFO: Received: AnonymousABC: Hello!
INFO: Sent: AnonymousABC: Hello!

Step 4: Making the Chat Bidirectional (Because Talking to Yourself is Weird) #

Right now, our chat app is like a monologue—you send a message, and it echoes back to you. But let’s be honest, talking to yourself is only fun for so long. To fix this, we’ll:

  • Track all connected users: So the server knows who’s chatting.
  • Broadcast messages: So everyone in the group can see them (because group chats are where the drama happens).
Create a Connection Manager #

This guy will keep track of who’s connected and broadcast messages like a town crier.

# connection_manager.py

from fastapi import WebSocket
from typing import List

class ConnectionManager:
    def __init__(self):
        self.active_connections: List[WebSocket] = []

    async def connect(self, websocket: WebSocket):
        await websocket.accept()
        self.active_connections.append(websocket)
        print(f"INFO: New connection. Total connections: {len(self.active_connections)}")

    def disconnect(self, websocket: WebSocket):
        self.active_connections.remove(websocket)
        print(f"INFO: Connection removed. Total connections: {len(self.active_connections)}")

    async def broadcast(self, message: str):
        for connection in self.active_connections:
            await connection.send_text(message)
        print(f"INFO: Broadcasted: {message}")
Update the WebSocket Endpoint #

Now, the server will broadcast messages to everyone, not just the sender.

# web_socket.py

from fastapi import FastAPI, WebSocket
from connection_manager import ConnectionManager

app = FastAPI()

manager = ConnectionManager()

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await manager.connect(websocket)
    try:
        while True:
            data = await websocket.receive_text()
            print(f"INFO: Received: {data}")
            await manager.broadcast(data)  # Send to everyone
    except Exception as e:
        print(f"ERROR: {e}")
    finally:
        manager.disconnect(websocket)

Now your chat app is fully functional and ready for some real-time drama. Next up: scaling it like a pro. Stay tuned!

Step 5: How Far Can We Go with this setup? #

Our current setup uses a simple ConnectionManager to handle WebSocket connections and broadcast messages. But how many users can it handle before things start falling apart? Let’s do some math and find out.

Back-of-the-Envelope Calculations #

Memory Usage

  • Each WebSocket connection consumes ~10 KB of memory (this can vary based on the server and framework).
  • If your server has 8 GB of RAM available for the app:
Total connections = Total RAM / Memory per connection
Total connections = 8 GB / 10 KB = 800,000 connections

But wait! This is just memory. In reality, CPU and network limits will kick in much earlier.

CPU Usage

  • Broadcasting a message to N users requires O(N) CPU operations.
  • If you have 1,000 users and each sends 10 messages per second:
Messages per second = 1,000 users * 10 messages = 10,000 messages/second

Network Bandwidth If each message is 100 bytes, and you have 1,000 users sending 10 messages per second:

Bandwidth = 1,000 users * 10 messages * 100 bytes = 1 MB/s

This is manageable for now, but as users grow, bandwidth becomes a bottleneck.

Practical Limits

  • Single Server: With 4 CPU cores and 8 GB RAM, you can handle ~1,000–2,000 concurrent users before performance degrades.
  • Bottlenecks:
    • CPU struggles with broadcasting.
    • Memory fills up as connections grow.
    • Network bandwidth becomes a limiting factor.

What Happens When You Hit the Limit?

  • Latency: Messages take longer to deliver.
  • Timeouts: New connections may fail.
  • Crashes: The server might run out of memory or CPU.

What If You’re Running the HTTP Server on the Same Machine?

  • The HTTP server will share the same CPU, memory, and network resources.
  • This reduces the available resources for WebSocket connections, lowering the practical limit to ~500–1,000 concurrent users.

Without these, you’re stuck with the limits of a single server. In the next section, we’ll explore scaling with Redis. Stay tuned!

Scaling Up: Handling Moderate Traffic #

Our current setup can handle ~1,000–2,000 users, but what if your app goes viral? (Hey, we can dream, right?) Let’s scale things up with Redis.

Why Redis? #

Redis is an in-memory data store that’s perfect for:

  • Storing WebSocket connections: Track users across multiple servers.
  • Pub/Sub Messaging: Broadcast messages efficiently.
  • Scalability: Handle thousands of connections without breaking a sweat.

Let&rsquo;s Add Redis Connection Manager #

We’ll create a singleton Redis connection to manage WebSocket connections and messages.

# reddis_connection_manager.py
from redis.asyncio import Redis
import json
from fastapi import WebSocket
import asyncio
from typing import Dict
import uuid

class RedisConnectionManager:
    def __init__(self):
        self.redis = Redis(host="localhost", port=6379, db=0)
        # Generate a unique ID for this server instance
        self.server_id = str(uuid.uuid4())
        
    async def connect(self, websocket: WebSocket, client_id: str):
        """Register new WebSocket connection in Redis"""
        await websocket.accept()
        
        # Store client info in Redis
        client_data = {
            'server_id': self.server_id,
            'client_id': client_id,
            'timestamp': asyncio.get_event_loop().time()
        }
        
        # Use Redis pipeline for atomic operations
        async with self.redis.pipeline(transaction=True) as pipe:
            # Add to active clients set
            await pipe.sadd('active_clients', client_id)
            # Store client metadata
            await pipe.hset(f'client:{client_id}', mapping=client_data)
            await pipe.execute()
            
        print(f"Client {client_id} connected to server {self.server_id}")
        
        # Start message listener for this client
        asyncio.create_task(self._client_listener(websocket, client_id))
        
    async def disconnect(self, client_id: str):
        """Remove client from Redis"""
        async with self.redis.pipeline(transaction=True) as pipe:
            await pipe.srem('active_clients', client_id)
            await pipe.delete(f'client:{client_id}')
            await pipe.execute()
            
        print(f"Client {client_id} disconnected from server {self.server_id}")
    
    async def broadcast(self, message: dict):
        """Broadcast message to all active clients"""
        message_str = json.dumps(message)
        await self.redis.publish('broadcast_channel', message_str)
    
    async def _client_listener(self, websocket: WebSocket, client_id: str):
        """Listen for messages intended for this specific client"""
        pubsub = None
        try:
            pubsub = self.redis.pubsub()
            await pubsub.subscribe('broadcast_channel')
            
            while True:
                try:
                    message = await pubsub.get_message(ignore_subscribe_messages=True)
                    if message and message["type"] == "message":
                        # Check if client is still active
                        is_active = await self.redis.sismember('active_clients', client_id)
                        if not is_active:
                            break
                            
                        # Forward message to WebSocket
                        await websocket.send_text(message["data"].decode())
                        
                except Exception as e:
                    print(f"Error sending to client {client_id}: {e}")
                    break
                    
                await asyncio.sleep(0.01)
                
        finally:
            if pubsub:
                await pubsub.unsubscribe()
                await pubsub.close()
            await self.disconnect(client_id)

Update the WebSocket Endpoint #

# web_socket.py
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from redis_conn_manager import RedisConnectionManager


app = FastAPI()
redis_manager = RedisConnectionManager()

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    client_id = str(id(websocket))
    try:
        await redis_manager.connect(websocket, client_id)
        while True:
            data = await websocket.receive_text()
            message = {
                "client_id": client_id,
                "server_id": redis_manager.server_id,
                "message": data
            }
            await redis_manager.broadcast(message)
    except WebSocketDisconnect:
        await redis_manager.disconnect(client_id)
    except Exception as e:
        print(f"Error handling websocket: {e}")
        await redis_manager.disconnect(client_id)

Use this in the html response for the root page

 // Generate a random username (e.g., "AnonymousXYZ")
  const generateUsername = () => {
      const randomPart = Math.random().toString(36).substring(2, 5).toUpperCase();
      return `Anonymous - ${randomPart}`;
  };

  const username = generateUsername(); // Assign a random username
  const ws = new WebSocket("ws://localhost:8081/ws");
          
  ws.onopen = () => {
      console.log("WebSocket connected ✅");
      ws.send(username); // Send the username first
  };
  ws.onerror = (event) => {
      console.error("WebSocket error ❌:", event);
  };


  // Function to send a message
  const sendMessage = () => {
      const message = document.getElementById("message").value.trim();
      if (message === "") return; // Prevent sending empty messages

      ws.send(`${message}`);
      document.getElementById("message").value = ""; // Clear the input box

  };

  // Display incoming messages
  ws.onmessage = (event) => {
      const messages = document.getElementById("messages");
      const li = document.createElement("li");
      console.log(event.data);
      li.textContent = event.data;
      messages.appendChild(li);
  };

Code Breakdown #

Let’s dive into how our distributed chat system actually works (without the fluff):

Connection Setup

async def connect(self, websocket: WebSocket, client_id: str):
  • Each new connection gets a unique ID
  • Redis stores this in two places: active_clients set and client:{id} hash (their personal details)
  • No storing connections in memory, for single server you can do that python memory, but that won’t scale

Message Broadcasting

async def broadcast(self, message: dict):
  • Takes a message and publishes to Redis channel
  • Like shouting in a room, but more sophisticated
  • Redis handles the heavy lifting of message distribution

The Magic Listener

async def _client_listener(self, websocket: WebSocket, client_id: str):
  • Each client gets their own listener (personal DJ if you will)
  • Subscribes to Redis channel
  • Forwards messages to the right WebSocket
  • Dies gracefully when connection drops (RIP)

Disconnection

async def disconnect(self, client_id: str):
  • Removes client from Redis sets and hashes
  • Like checking out of a hotel, but we actually clean up after ourselves

Scaling with Redis (Math Edition) #

Let’s compare our Python ConnectionManager with Redis and see how Redis saves the day.

Python ConnectionManager

  • Memory: Each WebSocket connection consumes ~10 KB. 8 GB RAM → ~800,000 connections (theoretically).
  • CPU: Broadcasting to N users requires O(N) operations. 1,000 users sending 10 messages/second = 10,000 messages/second (CPU cries).
  • Practical Limit: ~1,000–2,000 users (CPU and memory choke), due to event loop limitations and GIL.

Redis to the Rescue

  • Memory: Redis stores only metadata and Pub/Sub references, reducing its per-connection memory footprint. 8 GB → 800,000 connections assumption is rough but fair for estimation. However, in practice, network buffers and additional metadata would slightly reduce this number.
  • CPU: Redis Pub/Sub is not O(N) but significantly optimised. Internally, Redis Pub/Sub still needs to push messages to each subscriber. However, redis minimizes context switching, optimizes memory access, and efficiently handles large numbers of subscribers compared to a Python-based event loop.
  • Practical Limit: ~10,000–20,000 concurrent users, (Redis flexes its muscles) but this depends on message frequency and server configuration. Some deployments (with Redis Cluster, Sentinel, or sharding) scale far beyond this.

Why Redis Wins

  • Optimized Broadcasting: Pub/Sub efficiently delivers messages with minimal latency.
  • Scalable Architecture: Supports clustering and sharding for handling massive user loads.
  • Lower CPU Overhead: Offloads message distribution from the application server.
  • High Throughput: Handles thousands of messages per second with minimal performance loss.
  • Reliable Scaling: Works with load balancers and multiple Redis instances for redundancy.

This setup lets us scale horizontally - just add more servers like adding more pizza to a party. Each server is independent but they all work together through Redis, like a well-oiled machine (that runs on caffeine). That’s it! No memory leaks, no server drama, just clean, scalable code!

The Dream Scale: Handling Thousands of Users #

Now that we’ve scaled with Redis, let’s dream bigger. What if your app goes viral and needs to handle thousands (or millions) of users? Enter RabbitMQ (and friends).

Why RabbitMQ? #

RabbitMQ is a message broker designed for:

  • High Throughput: Handles millions of messages per second.
  • Message Queuing: Ensures no messages are lost, even under heavy load.
  • Complex Routing: Supports advanced routing patterns (e.g., fanout, direct, topic).

If your app needs:

  • Guaranteed message delivery.
  • Complex message routing.
  • High throughput (millions of messages/second). RabbitMQ is your go-to tool.

Back-of-the-Envelope Calculations #

Redis:

  • Handles ~10,000–20,000 users comfortably.
  • Great for real-time broadcasting and simple Pub/Sub.
  • Bottleneck: CPU and memory on a single server.

RabbitMQ:

  • Handles millions of messages/second.
  • Uses disk-based storage for messages, ensuring reliability.
  • Scales horizontally with clusters.

Practical Limits:

  • Single RabbitMQ Server: ~50,000–100,000 users.
  • RabbitMQ Cluster: Millions of users (distributed across multiple servers).

RabbitMQ Connection Manager #

# rabbitmq_connection_manager.py

import pika
import json

class RabbitMQManager:
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
        self.channel = self.connection.channel()
        self.channel.exchange_declare(exchange="chat", exchange_type="fanout")

    def publish(self, message):
        self.channel.basic_publish(exchange="chat", routing_key="", body=json.dumps(message))

    def consume(self, callback):
        result = self.channel.queue_declare(queue="", exclusive=True)
        queue_name = result.method.queue
        self.channel.queue_bind(exchange="chat", queue=queue_name)
        self.channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
        self.channel.start_consuming()
# web_socket.py

from fastapi import FastAPI, WebSocket
from rabbitmq_connection_manager import RabbitMQManager

app = FastAPI()

redis_manager = RabbitMQManager()


@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    redis_manager.subscribe("chat")  # Subscribe to the "chat" channel
    username = None
    try:
      # Receive the username from the frontend
        username = await websocket.receive_text()
        print(f"INFO: {username} connected")
        while True:
            data = await websocket.receive_text()
            redis_manager.publish("chat", {"user": username, "message": data})
    except Exception as e:
        print(f"ERROR: {e}")
    finally:
        redis_manager.pubsub.unsubscribe("chat")
        print(f"INFO: {username} disconnected")

Managed Services vs. Self-Hosted #

Managed Services:

  • Pros: No setup hassle, automatic scaling, and maintenance.
  • Examples: Google Pub/Sub, AWS SQS, Azure Service Bus.

Self-Hosted:

  • Pros: Full control, great for learning, and cost-effective for small to medium apps.
  • Cons: You handle scaling, maintenance, and troubleshooting.

Why Self-Hosted?

If you’re like me and enjoy solving problems (and occasionally crying over server logs), self-hosting RabbitMQ is a great learning opportunity. Plus, it’s satisfying to see your app scale!

Conclusion: Real-Time Apps Made Simple #

Building real-time apps requires the right balance of speed, scalability, and reliability. A Python-based connection manager works for small apps but struggles under heavy load. Redis Pub/Sub efficiently scales WebSockets but lacks message persistence. For guaranteed delivery, RabbitMQ or managed pub/sub services are the best choices.

Choose Redis for speed, RabbitMQ for reliability, or a hybrid approach for the best of both worlds. 🚀

What Are Your Thoughts? #

Real-time scalability is an evolving challenge, and every use case is unique. Which approach has worked best for you? Have you faced scaling challenges with WebSockets? Share your experiences, insights, or questions in the comments below! 👇