Build a Fullstack AI Chatbot Part 3 - Real-Time Systems with Redis
Our application currently does not store any state, and there is no way to identify users or store and retrieve chat data. We are also returning a hard-coded response to the client during chat sessions.
Table of Content
In this part of the tutorial, we will cover the following:
- How to connect to a Redis Cluster in Python and set up a Redis Client
- How to store and retrieve data with Redis JSON
- How to set up Redis Streams as message queues between a web server and worker environment
Redis and Distributed Messaging Queues
Redis is an open-source in-memory data store that you can use as a database, cache, message broker, and streaming engine. It supports a number of data structures and is a perfect solution for distributed applications with real-time capabilities.
Redis Enterprise Cloud is a fully managed cloud service provided by Redis that helps us deploy Redis clusters at an infinite scale without worrying about infrastructure.
We will be using a free Redis Enterprise Cloud instance for this tutorial. You can Get started with Redis Cloud for free here and follow This tutorial to set up a Redis database and Redis Insight, a GUI to interact with Redis.
Once you have set up your Redis database, create a new folder in the project root (outside the server folder) named worker
.
We will isolate our worker environment from the web server so that when the client sends a message to our WebSocket, the web server does not have to handle the request to the third-party service. Also, resources can be freed up for other users.
The background communication with the inference API is handled by this worker service, through Redis.
Requests from all the connected clients are appended to the message queue (producer), while the worker consumes the messages, sends off the requests to the inference API, and appends the response to a response queue.
Once the API receives a response, it sends it back to the client.
During the trip between the producer and the consumer, the client can send multiple messages, and these messages will be queued up and responded to in order.
Ideally, we could have this worker running on a completely different server, in its own environment, but for now, we will create its own Python environment on our local machine.
You might be wondering - why do we need a worker? Imagine a scenario where the web server also creates the request to the third-party service. This means that while waiting for the response from the third party service during a socket connection, the server is blocked and resources are tied up till the response is obtained from the API.
You can try this out by creating a random sleep time.sleep(10)
before sending the hard-coded response, and sending a new message. Then try to connect with a different token in a new postman session.
You will notice that the chat session will not connect until the random sleep times out.
While we can use asynchronous techniques and worker pools in a more production-focused server set-up, that also won't be enough as the number of simultaneous users grow.
Ultimately, we want to avoid tying up the web server resources by using Redis to broker the communication between our chat API and the third-party API.
Next open up a new terminal, cd into the worker folder, and create and activate a new Python virtual environment similar to what we did in part 1.
Next, install the following dependencies:
pip install aiohttp aioredis python-dotenv
How to Connect to a Redis Cluster in Python with a Redis Client
We will use the aioredis client to connect with the Redis database. We'll also use the requests library to send requests to the Huggingface inference API.
Create two files .env
, and main.py
. Then create a folder named src
. Also, create a folder named redis
and add a new file named config.py
.
In the .env
file, add the following code - and make sure you update the fields with the credentials provided in your Redis Cluster.
export REDIS_URL=<REDIS URL PROVIDED IN REDIS CLOUD>
export REDIS_USER=<REDIS USER IN REDIS CLOUD>
export REDIS_PASSWORD=<DATABASE PASSWORD IN REDIS CLOUD>
export REDIS_HOST=<REDIS HOST IN REDIS CLOUD>
export REDIS_PORT=<REDIS PORT IN REDIS CLOUD>
In config.py add the Redis Class below:
import os
from dotenv import load_dotenv
import aioredis
load_dotenv()
class Redis():
def __init__(self):
"""initialize connection """
self.REDIS_URL = os.environ['REDIS_URL']
self.REDIS_PASSWORD = os.environ['REDIS_PASSWORD']
self.REDIS_USER = os.environ['REDIS_USER']
self.connection_url = f"redis://{self.REDIS_USER}:{self.REDIS_PASSWORD}@{self.REDIS_URL}"
async def create_connection(self):
self.connection = aioredis.from_url(
self.connection_url, db=0)
return self.connection
We create a Redis object and initialize the required parameters from the environment variables. Then we create an asynchronous method create_connection
to create a Redis connection and return the connection pool obtained from the aioredis
method from_url
.
Next, we test the Redis connection in main.py by running the code below. This will create a new Redis connection pool, set a simple key "key", and assign a string "value" to it.
from src.redis.config import Redis
import asyncio
async def main():
redis = Redis()
redis = await redis.create_connection()
print(redis)
await redis.set("key", "value")
if __name__ == "__main__":
asyncio.run(main())
Now open Redis Insight (if you followed the tutorial to download and install it) You should see something like this:
How to Work with Redis Streams
Now that we have our worker environment setup, we can create a producer on the web server and a consumer on the worker.
First, let's create our Redis class again on the server. In server.src
create a folder named redis
and add two files, config.py
and producer.py
.
In config.py
, add the code below as we did for the worker environment:
import os
from dotenv import load_dotenv
import aioredis
load_dotenv()
class Redis():
def __init__(self):
"""initialize connection """
self.REDIS_URL = os.environ['REDIS_URL']
self.REDIS_PASSWORD = os.environ['REDIS_PASSWORD']
self.REDIS_USER = os.environ['REDIS_USER']
self.connection_url = f"redis://{self.REDIS_USER}:{self.REDIS_PASSWORD}@{self.REDIS_URL}"
async def create_connection(self):
self.connection = aioredis.from_url(
self.connection_url, db=0)
return self.connection
In the .env file, also add the Redis credentials:
export REDIS_URL=<REDIS URL PROVIDED IN REDIS CLOUD>
export REDIS_USER=<REDIS USER IN REDIS CLOUD>
export REDIS_PASSWORD=<DATABASE PASSWORD IN REDIS CLOUD>
export REDIS_HOST=<REDIS HOST IN REDIS CLOUD>
export REDIS_PORT=<REDIS PORT IN REDIS CLOUD>
Finally, in server.src.redis.producer.py
add the following code:
from .config import Redis
class Producer:
def __init__(self, redis_client):
self.redis_client = redis_client
async def add_to_stream(self, data: dict, stream_channel):
try:
msg_id = await self.redis_client.xadd(name=stream_channel, id="*", fields=data)
print(f"Message id {msg_id} added to {stream_channel} stream")
return msg_id
except Exception as e:
print(f"Error sending msg to stream => {e}")
We created a Producer class that is initialized with a Redis client. We use this client to add data to the stream with the add_to_stream
method, which takes the data and the Redis channel name.
The Redis command for adding data to a stream channel is xadd
and it has both high-level and low-level functions in aioredis.
Next, to run our newly created Producer, update chat.py
and the WebSocket /chat
endpoint like below. Notice the updated channel name message_channel
.
from ..redis.producer import Producer
from ..redis.config import Redis
chat = APIRouter()
manager = ConnectionManager()
redis = Redis()
@chat.websocket("/chat")
async def websocket_endpoint(websocket: WebSocket, token: str = Depends(get_token)):
await manager.connect(websocket)
redis_client = await redis.create_connection()
producer = Producer(redis_client)
try:
while True:
data = await websocket.receive_text()
print(data)
stream_data = {}
stream_data[token] = data
await producer.add_to_stream(stream_data, "message_channel")
await manager.send_personal_message(f"Response: Simulating response from the GPT service", websocket)
except WebSocketDisconnect:
manager.disconnect(websocket)
Next, in Postman, create a connection and send any number of messages that say Hello
. You should have the stream messages printed to the terminal like below:
In Redis Insight, you will see a new mesage_channel
created and a time-stamped queue filled with the messages sent from the client. This timestamped queue is important to preserve the order of the messages.
How to Model the Chat Data
Next, we'll create a model for our chat messages. Recall that we are sending text data over WebSockets, but our chat data needs to hold more information than just the text. We need to timestamp when the chat was sent, create an ID for each message, and collect data about the chat session, then store this data in a JSON format.
We can store this JSON data in Redis so we don't lose the chat history once the connection is lost, because our WebSocket does not store state.
In server.src
create a new folder named schema
. Then create a file named chat.py
in server.src.schema
add the following code:
from datetime import datetime
from pydantic import BaseModel
from typing import List, Optional
import uuid
class Message(BaseModel):
id = uuid.uuid4()
msg: str
timestamp = str(datetime.now())
class Chat(BaseModel):
token: str
messages: List[Message]
name: str
session_start = str(datetime.now())
We are using Pydantic's BaseModel
class to model the chat data. The Chat
class will hold data about a single Chat session. It will store the token, name of the user, and an automatically generated timestamp for the chat session start time using datetime.now()
.
The messages sent and received within this chat session are stored with a Message
class which creates a chat id on the fly using uuid4
. The only data we need to provide when initializing this Message
class is the message text.
How to Work with Redis JSON
In order to use Redis JSON's ability to store our chat history, we need to install rejson provided by Redis labs.
In the terminal, cd into server
and install rejson with pip install rejson
. Then update your Redis
class in server.src.redis.config.py
to include the create_rejson_connection
method:
import os
from dotenv import load_dotenv
import aioredis
from rejson import Client
load_dotenv()
class Redis():
def __init__(self):
"""initialize connection """
self.REDIS_URL = os.environ['REDIS_URL']
self.REDIS_PASSWORD = os.environ['REDIS_PASSWORD']
self.REDIS_USER = os.environ['REDIS_USER']
self.connection_url = f"redis://{self.REDIS_USER}:{self.REDIS_PASSWORD}@{self.REDIS_URL}"
self.REDIS_HOST = os.environ['REDIS_HOST']
self.REDIS_PORT = os.environ['REDIS_PORT']
async def create_connection(self):
self.connection = aioredis.from_url(
self.connection_url, db=0)
return self.connection
def create_rejson_connection(self):
self.redisJson = Client(host=self.REDIS_HOST,
port=self.REDIS_PORT, decode_responses=True, username=self.REDIS_USER, password=self.REDIS_PASSWORD)
return self.redisJson
We are adding the create_rejson_connection
method to connect to Redis with the rejson Client
. This gives us the methods to create and manipulate JSON data in Redis, which are not available with aioredis.
Next, in server.src.routes.chat.py
we can update the /token
endpoint to create a new Chat
instance and store the session data in Redis JSON like so:
@chat.post("/token")
async def token_generator(name: str, request: Request):
token = str(uuid.uuid4())
if name == "":
raise HTTPException(status_code=400, detail={
"loc": "name", "msg": "Enter a valid name"})
# Create new chat session
json_client = redis.create_rejson_connection()
chat_session = Chat(
token=token,
messages=[],
name=name )
# Store chat session in redis JSON with the token as key
json_client.jsonset(str(token), Path.rootPath(), chat_session.dict())
# Set a timeout for redis data
redis_client = await redis.create_connection()
await redis_client.expire(str(token), 3600)
return chat_session.dict()
NOTE: Because this is a demo app, I do not want to store the chat data in Redis for too long. So I have added a 60-minute time out on the token using the aioredis client (rejson does not implement timeouts). This means that after 60 minutes, the chat session data will be lost.
This is necessary because we are not authenticating users, and we want to dump the chat data after a defined period. This step is optional, and you don't have to include it.
Next, in Postman, when you send a POST request to create a new token, you will get a structured response like the one below. You can also check Redis Insight to see your chat data stored with the token as a JSON key and the data as a value.
Updating the Token Dependency
Now that we have a token being generated and stored, this is a good time to update the get_token
dependency in our /chat
WebSocket. We do this to check for a valid token before starting the chat session.
In server.src.socket.utils.py
update the get_token
function to check if the token exists in the Redis instance. If it does then we return the token, which means that the socket connection is valid. If it doesn't exist, we close the connection.
The token created by /token
will cease to exist after 60 minutes. So we can have some simple logic on the frontend to redirect the user to generate a new token if an error response is generated while trying to start a chat.
from ..redis.config import Redis
async def get_token(
websocket: WebSocket,
token: Optional[str] = Query(None),
):
if token is None or token == "":
await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
redis_client = await redis.create_connection()
isexists = await redis_client.exists(token)
if isexists == 1:
return token
else:
await websocket.close(code=status.WS_1008_POLICY_VIOLATION, reason="Session not authenticated or expired token")
To test the dependency, connect to the chat session with the random token we have been using, and you should get a 403 error. (Note that you have to manually delete the token in Redis Insight.)
Now copy the token generated when you sent the post request to the /token
endpoint (or create a new request) and paste it as the value to the token query parameter required by the /chat
WebSocket. Then connect. You should get a successful connection.
Bringing it all together, your chat.py should look like the below.
import os
from fastapi import APIRouter, FastAPI, WebSocket, WebSocketDisconnect, Request, Depends
import uuid
from ..socket.connection import ConnectionManager
from ..socket.utils import get_token
import time
from ..redis.producer import Producer
from ..redis.config import Redis
from ..schema.chat import Chat
from rejson import Path
chat = APIRouter()
manager = ConnectionManager()
redis = Redis()
# @route POST /token
# @desc Route to generate chat token
# @access Public
@chat.post("/token")
async def token_generator(name: str, request: Request):
token = str(uuid.uuid4())
if name == "":
raise HTTPException(status_code=400, detail={
"loc": "name", "msg": "Enter a valid name"})
# Create nee chat session
json_client = redis.create_rejson_connection()
chat_session = Chat(
token=token,
messages=[],
name=name
)
print(chat_session.dict())
# Store chat session in redis JSON with the token as key
json_client.jsonset(str(token), Path.rootPath(), chat_session.dict())
# Set a timeout for redis data
redis_client = await redis.create_connection()
await redis_client.expire(str(token), 3600)
return chat_session.dict()
# @route POST /refresh_token
# @desc Route to refresh token
# @access Public
@chat.post("/refresh_token")
async def refresh_token(request: Request):
return None
# @route Websocket /chat
# @desc Socket for chat bot
# @access Public
@chat.websocket("/chat")
async def websocket_endpoint(websocket: WebSocket, token: str = Depends(get_token)):
await manager.connect(websocket)
redis_client = await redis.create_connection()
producer = Producer(redis_client)
json_client = redis.create_rejson_connection()
try:
while True:
data = await websocket.receive_text()
stream_data = {}
stream_data[token] = data
await producer.add_to_stream(stream_data, "message_channel")
await manager.send_personal_message(f"Response: Simulating response from the GPT service", websocket)
except WebSocketDisconnect:
manager.disconnect(websocket)
Well done on reaching it this far! In the next section, we will focus on communicating with the AI model and handling the data transfer between client, server, worker, and the external API.
This Article is part of a series on building full-stack intelligent chatbots with tools like Python, React, Huggingface, Redis, and so on. You can follow the full series on my blog: https://blog.stephensanwo.dev - AI ChatBot Series
You can download the full repository on My Github Repository