👋 Welcome to the ChatGPT Web App Server repository, a full-stack implementation of an API server built with Python FastAPI, and a beautiful frontend powered by Flutter. 💬 This project is designed to deliver a seamless chat experience with the advanced GPT-3 model. 🔝 Offering a modern infrastructure that can be easily extended when GPT-4's Multimodal and Plugin features become available. 🚀 Enjoy your stay!
-
Note that the API keys doesn't play any role at this moment. OpenAI key in
.env
file will be used for all the requests. I just added this feature for who wants to use this project as a template for build production-ready chat app.
- All chat histories will be stored on Redis cache. You can load chat histories by clicking the chatroom name on the left side.
-
OpenAI's
gpt-3.5-turbo
model will be used as default. You can switch betweenLlamaCpp
andChatGPT
by entering command/model <model>
on the chat input. You must download.bin
file to any folder and define the path inLLMModels
class inapp\models\gpt_llms.py
- FastAPI - High-performance
web framework
for building APIs with Python. - Flutter -
Webapp
frontend with beautiful UI and rich set of customizable widgets. - ChatGPT - Seamless integration with the
OpenAI API
for text generation and message management. - LLAMA - Suporting LocalLLM,
LlamaCpp
, with multiprocessing. - WebSocket Connection -
Real-time
, two-way communication with the ChatGPT model, with Flutter frontend webapp. - Vectorstore - Using
Redis
andLangchain
, store and retrieve vector embeddings for similarity search. It will help ChatGPT to generate more relevant responses. - Concurrency - Asynchronous programming with
async
/await
syntax for concurrency and parallelism. - Security - Token validation and authentication to keep API secure.
- Database - Manage database connections and execute
MySQL
queries. Easily perform Create, Read, Update, and Delete actions, withsqlalchemy.asyncio
- Cache - Manage cache connections and execute
Redis
queries with aioredis. Easily perform Create, Read, Update, and Delete actions, withaioredis
.
To set up the ChatGPT Web App Server on your local machine, follow these simple steps:
- Clone the repository:
git clone https://github.com/c0sogi/chatgpt-webapp-server.git
- Change to the project directory:
cd chatgpt-webapp-server
-
Create
.env
file and setup for fastapi server, referring to.env-sample
file. Enter Database connection to create, OpenAI API Key, and other necessary configurations. Optionals are not required, just leave them blank. -
To run the server, execute. It may take a few minutes to start the server for the first time:
docker-compose -f docker-compose-local.yaml up -d
- To stop the server, execute:
docker-compose -f docker-compose-local.yaml down
- Now you can access the server at
http://localhost:8000/docs
and the database atdb:3306
orcache:6379
. You can also access the ChatGPT Web App athttp://localhost:8000/chatgpt
.
Your ChatGPT Web App Server should now be up and running, ready to provide an engaging chat experience!
This project is licensed under the MIT License, which allows for free use, modification, and distribution, as long as the original copyright and license notice are included in any copy or substantial portion of the software.
🚀 FastAPI
is a modern web framework for building APIs with Python.
💪 It has high performance, easy to learn, fast to code, and ready for production.
👍 One of the main features of FastAPI
is that it supports concurrency and async
/await
syntax.
🤝 This means that you can write code that can handle multiple tasks at the same time without blocking each other, especially when dealing with I/O bound operations, such as network requests, database queries, file operations, etc.
📱 Flutter
is an open-source UI toolkit developed by Google for building native user interfaces for mobile, web, and desktop platforms from a single codebase.
👨💻 It uses Dart
, a modern object-oriented programming language, and provides a rich set of customizable widgets that can adapt to any design.
You can access ChatGPT
or LlamaCpp
through WebSocket
connection using two modules: app/routers/websocket
and app/utils/chatgpt/chatgpt_stream_manager
. These modules facilitate the communication between the Flutter
client and the ChatGPT model through a WebSocket. With the WebSocket, you can establish a real-time, two-way communication channel to interact with the LLM.
websocket.py
is responsible for setting up a WebSocket connection and handling user authentication. It defines the WebSocket route /chatgpt/{api_key}
that accepts a WebSocket and an API key as parameters.
When a client connects to the WebSocket, it first checks the API key to authenticate the user. If the API key is valid, the begin_chat()
function is called from the chatgpt_stream_manager.py
module to start the ChatGPT conversation.
In case of an unregistered API key or an unexpected error, an appropriate message is sent to the client and the connection is closed.
@router.websocket("/chatgpt/{api_key}")
async def ws_chatgpt(websocket: WebSocket, api_key: str):
...
chatgpt_stream_manager.py
is responsible for managing the conversation and handling user messages. It defines the begin_chat()
function, which takes a WebSocket, a user ID as parameters.
The function first initializes the user's GPT context from the cache manager. Then, it sends the initial message history to the client through the WebSocket.
The conversation continues in a loop until the connection is closed. During the conversation, the user's messages are processed and GPT's responses are generated accordingly.
async def begin_chat(
websocket: WebSocket,
user_id: str,
) -> None:
...
The SendToWebsocket
class is used for sending messages and streams to the WebSocket. It has two methods: message()
and stream()
. The message()
method sends a complete message to the WebSocket, while the stream()
method sends a stream to the WebSocket.
class SendToWebsocket:
@staticmethod
async def message(...):
...
@staticmethod
async def stream(...):
...
The HandleMessage
class also handles GPT responses. The gpt()
method sends the GPT response to the WebSocket. If translation is enabled, the response is translated using the Google Translate API before sending it to the client.
class HandleMessage:
...
@staticmethod
async def gpt(...):
...
User messages are processed using the HandleMessage
class. If the message starts with /
, such as /YOUR_CALLBACK_NAME
. it is treated as a command and the appropriate command response is generated. Otherwise, the user's message is processed and sent to the GPT model for generating a response.
Commands are handled using the ChatGptCommands
class. It executes the corresponding callback function depending on the command. You can add new commands by simply adding callback in ChatGptCommands
class from app.utils.chatgpt.chatgpt_commands
.
To start a ChatGPT conversation, connect to the WebSocket route /ws/chatgpt/{api_key}
with a valid API key registered in the database. Note that this API key is not the same as OpenAI API key, but only available for your server to validate the user. Once connected, you can send messages and commands to interact with the ChatGPT model. The WebSocket will send back GPT's responses in real-time. This websocket connection is established via Flutter app, which can accessed with endpoint /chatgpt
.
Using Redis for storing vector embeddings of conversations 🗨️ can aid the ChatGPT model 🤖 in several ways, such as efficient and fast retrieval of conversation context 🕵️♀️, handling large amounts of data 📊, and providing more relevant responses through vector similarity search 🔎.
Some fun examples of how this could work in practice:
- Imagine a user is chatting with ChatGPT about their favorite TV show 📺 and mentions a specific character 👤. Using Redis, ChatGPT could retrieve previous conversations where that character was mentioned and use that information to provide more detailed insights or trivia about that character 🤔.
- Another scenario could be a user discussing their travel plans
✈️ with ChatGPT. If they mention a particular city 🌆 or landmark 🏰, ChatGPT could use vector similarity search to retrieve previous conversations that discussed the same location and provide recommendations or tips based on that context 🧳. - If a user mentions a particular cuisine 🍝 or dish 🍱, ChatGPT could retrieve previous conversations that discussed those topics and provide recommendations or suggestions based on that context 🍴.
When a user enters a command in the chat window like /embed <text_to_embed>
, the VectorStoreManager.create_documents
method is called. This method converts the input text into a vector using OpenAI's text-embedding-ada-002
model and stores it in the Redis vectorstore.
@staticmethod
@CommandResponse.send_message_and_stop
async def embed(text_to_embed: str, /) -> str:
"""Embed the text and save its vectors in the redis vectorstore.\n
/embed <text_to_embed>"""
await VectorStoreManager.create_documents(
text=text_to_embed,
)
return "Embedding successful!"
When the user enters the /query <query>
command, the asimilarity_search
function is used to find up to three results with the highest vector similarity to the embedded data in the Redis vectorstore. These results are then stored in the context of the chatlist, which helps ChatGPT answer the query by referring to this data in the future.
@staticmethod
@CommandResponse.handle_gpt
async def query(query: str, /, buffer: BufferedUserContext) -> None:
"""Query from redis vectorstore\n
/query <query>"""
k: int = 3
found_document: list[Document] = (await VectorStoreManager.asimilarity_search(queries=[query], k=k))[0]
...
When running the begin_chat
function, if a user uploads a file containing text (e.g., a PDF or TXT file), the text is automatically extracted from the file, and its vector embedding is saved to Redis.
async def begin_chat(
websocket: WebSocket,
user_id: str,
) -> None:
...
while True: # loop until connection is closed
try:
rcvd: dict = await websocket.receive_json() # receive message from client
...
if "filename" in rcvd:
text: str = await run_in_threadpool(
read_bytes_to_text, await websocket.receive_bytes(), rcvd["filename"]
)
docs: list[str] = await VectorStoreManager.create_documents(text)
...
In the chatgpt_commands.py
file, there are several important components:
CommandResponse
: This class is used to set a decorator on the command method to specify the next action. It helps to define various response types, such as sending a message and stopping, sending a message and continuing, handling user input, handling GPT responses, and more.command_handler
: This function is responsible for performing a command callback method based on the text entered by the user.arguments_provider
: This function automatically supplies the arguments required by the command method based on the annotation type of the command method.
This repository contains different GPT LLM models, defined in gpt_llms.py
. There are two main models: LlamaCppModel
and OpenAIModel
, inheriting from the base class LLMModel
. Both models are designed for text generation. The LLMModels
enum is a collection of these LLMs.
There also exists module chatgpt_generation.py
that provides the functionality needed to integrate the OpenAI API with the ChatGPT chatbot service. It handles the process of organizing message history, generating text from the OpenAI API, and managing the asynchronous streaming of generated text.
All operations are handled asynchronously🚀 and can be used by multiple users at the same time. In particular, the LlamaCppModel
allows for parallel processing using multiprocessing and queues.
The default LLM model used by the user via UserGptContext.construct_default
is gpt-3.5-turbo
. You can change the default for that function. To change the LLM model via command, type /changemodel <model>
in the chat. The <model>
defined here should correspond to the member defined in LLMModels
.
OpenAIModel
generates text asynchronously by requesting Chat completion from the OpenAI server. It requires an OpenAI API key. As it uses an asynchronous client, the main thread remains unblocked.
LlamaCppModel
reads a locally stored LlamaCpp-compatible model and generates text in a new process. For example, it looks like ./llama_models/ggml/wizard-vicuna-13B.ggml.q5_1.bin
. You can download the required model from Huggingface. When generating text with this model, a processpool is created, and the Llama model is immediately cached in RAM. This allocation remains in memory until the processpool is forcibly terminated, such as by shutting down the server. By creating a new processpool and working in a different process, existing server processes are not blocked, and other users can generate text with the model simultaneously! More details are defined in chatgpt_llama_cpp.py
.
Handle exceptions that may occur during text generation: For OpenAI API, the following handlers are defined as follows:
try:
# Generate text from OpenAI API
except GptLengthException:
# Handle token limit exceeded
except GptContentFilterException:
# Handle content filter exception
except GptConnectionException:
# Handle connection error
except httpx.TimeoutException:
# Handle timeout exception
except Exception as exception:
# Handle unexpected exceptions
This project aims to create an API backend to enable the ChatGPT chatbot service. It utilizes a cache manager to store messages and user profiles in Redis, and a message manager to safely cache messages so that the number of tokens does not exceed an acceptable limit.
The Cache Manager (ChatGptCacheManager
) is responsible for handling user context information and message histories. It stores these data in Redis, allowing for easy retrieval and modification. The manager provides several methods to interact with the cache, such as:
read_context
: Reads the user's GPT context from Redis.create_context
: Creates a new user GPT context in Redis.reset_context
: Resets the user's GPT context to default values.update_message_histories
: Updates the message histories for a specific role (user, GPT, or system).lpop_message_history
/rpop_message_history
: Removes and returns the message history from the left or right end of the list.append_message_history
: Appends a message history to the end of the list.get_message_history
: Retrieves the message history for a specific role.delete_message_history
: Deletes the message history for a specific role.set_message_history
: Sets a specific message history for a role and index.get_all_chat_rooms
: Retrieves all chat rooms of an user from Redis.
The Message Manager (MessageManager
) ensures that the number of tokens in message histories does not exceed the specified limit. It safely handles adding, removing, and setting message histories in the user's GPT context while maintaining token limits. The manager provides several methods to interact with message histories, such as:
add_message_history_safely
: Adds a message history to the user's GPT context, ensuring that the token limit is not exceeded.pop_message_history_safely
: Removes and returns the message history from the right end of the list while updating the token count.set_message_history_safely
: Sets a specific message history in the user's GPT context, updating the token count and ensuring that the token limit is not exceeded.
To use the cache manager and message manager in your project, import them as follows:
from app.utils.chatgpt.chatgpt_cache_manager import ChatGptCacheManager
from app.utils.chatgpt.chatgpt_message_manager import MessageManager
Then, you can use their methods to interact with the Redis cache and manage message histories according to your requirements.
For example, to create a new user GPT context:
user_id = "[email protected]" # email format
chat_room_id = "example_chat_room_id" # usually the 32 characters from `uuid.uuid4().hex`
default_context = UserGptContext.construct_default(user_id=user_id, chat_room_id=chat_room_id)
await ChatGptCacheManager.create_context(user_gpt_context=default_context)
To safely add a message history to the user's GPT context:
user_gpt_context = await ChatGptCacheManager.read_context(user_id=user_id, chat_room_id=chat_room_id)
content = "This is a sample message."
role = "user" # can be "user", "gpt", or "system", or use enum such as GptRoles.USER, GptRoles.GPT, GptRoles.SYSTEM
await MessageManager.add_message_history_safely(user_gpt_context, content, role)
This project uses token_validator
middleware and other middlewares used in the FastAPI application. These middlewares are responsible for controlling access to the API, ensuring only authorized and authenticated requests are processed.
The following middlewares are added to the FastAPI application:
- Access Control Middleware: Ensures that only authorized requests are processed.
- CORS Middleware: Allows requests from specific origins, as defined in the app configuration.
- Trusted Host Middleware: Ensures that requests are coming from trusted hosts, as defined in the app configuration.
The Access Control Middleware is defined in the token_validator.py
file. It is responsible for validating API keys and JWT tokens.
The StateManager
class is used to initialize request state variables. It sets the request time, start time, IP address, and user token.
The AccessControl
class contains two static methods for validating API keys and JWT tokens:
api_service
: Validates API keys by checking the existence of required query parameters and headers in the request. It calls theValidator.api_key
method to verify the API key, secret, and timestamp.non_api_service
: Validates JWT tokens by checking the existence of the 'authorization' header or 'Authorization' cookie in the request. It calls theValidator.jwt
method to decode and verify the JWT token.
The Validator
class contains two static methods for validating API keys and JWT tokens:
api_key
: Verifies the API access key, hashed secret, and timestamp. Returns aUserToken
object if the validation is successful.jwt
: Decodes and verifies the JWT token. Returns aUserToken
object if the validation is successful.
The access_control
function is an asynchronous function that handles the request and response flow for the middleware. It initializes the request state using the StateManager
class, determines the type of authentication required for the requested URL (API key or JWT token), and validates the authentication using the AccessControl
class. If an error occurs during the validation process, an appropriate HTTP exception is raised.
Token utilities are defined in the token.py
file. It contains two functions:
create_access_token
: Creates a JWT token with the given data and expiration time.token_decode
: Decodes and verifies a JWT token. Raises an exception if the token is expired or cannot be decoded.
The params_utils.py
file contains a utility function for hashing query parameters and secret key using HMAC and SHA256:
hash_params
: Takes query parameters and secret key as input and returns a base64 encoded hashed string.
The date_utils.py
file contains the UTC
class with utility functions for working with dates and timestamps:
now
: Returns the current UTC datetime with an optional hour difference.timestamp
: Returns the current UTC timestamp with an optional hour difference.timestamp_to_datetime
: Converts a timestamp to a datetime object with an optional hour difference.
The logger.py
file contains the api_logger
function, which logs API request and response information, including the request URL, method, status code, client information, processing time, and error details (if applicable). The logger function is called at the end of the access_control
function to log the processed request and response.
To use the token_validator
middleware in your FastAPI application, simply import the access_control
function and add it as a middleware to your FastAPI instance:
from app.middlewares.token_validator import access_control
app = FastAPI()
app.add_middleware(dispatch=access_control, middleware_class=BaseHTTPMiddleware)
Make sure to also add the CORS and Trusted Host middlewares for complete access control:
app.add_middleware(
CORSMiddleware,
allow_origins=config.allowed_sites,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.add_middleware(
TrustedHostMiddleware,
allowed_hosts=config.trusted_hosts,
except_path=["/health"],
)
Now, any incoming requests to your FastAPI application will be processed by the token_validator
middleware and other middlewares, ensuring that only authorized and authenticated requests are processed.
This module app.database.connection
provides an easy-to-use interface for managing database connections and executing SQL queries using SQLAlchemy and Redis. It supports MySQL, and can be easily integrated with this project.
- Create and drop databases
- Create and manage users
- Grant privileges to users
- Execute raw SQL queries
- Manage database sessions with async support
- Redis caching support for faster data access
First, import the required classes from the module:
from app.database.connection import MySQL, SQLAlchemy, RedisFactory
Next, create an instance of the SQLAlchemy
class and configure it with your database settings:
from app.common.config import Config
config: Config = Config.get()
db = SQLAlchemy()
db.start(config)
Now you can use the db
instance to execute SQL queries and manage sessions:
# Execute a raw SQL query
result = await db.execute("SELECT * FROM users")
# Use the run_in_session decorator to manage sessions
@db.run_in_session
async def create_user(session, username, password):
await session.execute("INSERT INTO users (username, password) VALUES (:username, :password)", {"username": username, "password": password})
await create_user("JohnDoe", "password123")
To use Redis caching, create an instance of the RedisFactory
class and configure it with your Redis settings:
cache = RedisFactory()
cache.start(config)
You can now use the cache
instance to interact with Redis:
# Set a key in Redis
await cache.redis.set("my_key", "my_value")
# Get a key from Redis
value = await cache.redis.get("my_key")
In fact, in this project, the MySQL
class does the initial setup at app startup, and all database connections are made with only the db
and cache
variables present at the end of the module. 😅
All db settings will be done in create_app()
in app.common.app_settings
.
For example, the create_app()
function in app.common.app_settings
will look like this:
def create_app(config: Config) -> FastAPI:
# Initialize app & db & js
new_app = FastAPI(
title=config.app_title,
description=config.app_description,
version=config.app_version,
)
db.start(config=config)
cache.start(config=config)
js_url_initializer(js_location="app/web/main.dart.js")
# Register routers
# ...
return new_app
This project uses simple and efficient way to handle database CRUD (Create, Read, Update, Delete) operations using SQLAlchemy and two module and path: app.database.models.schema
and app.database.crud
.
The schema.py
module is responsible for defining database models and their relationships using SQLAlchemy. It includes a set of classes that inherit from Base
, an instance of declarative_base()
. Each class represents a table in the database, and its attributes represent columns in the table. These classes also inherit from a Mixin
class, which provides some common methods and attributes for all the models.
The Mixin class provides some common attributes and methods for all the classes that inherit from it. Some of the attributes include:
id
: Integer primary key for the table.created_at
: Datetime for when the record was created.updated_at
: Datetime for when the record was last updated.ip_address
: IP address of the client that created or updated the record.
It also provides several class methods that perform CRUD operations using SQLAlchemy, such as:
add_all()
: Adds multiple records to the database.add_one()
: Adds a single record to the database.update_where()
: Updates records in the database based on a filter.fetchall_filtered_by()
: Fetches all records from the database that match the provided filter.one_filtered_by()
: Fetches a single record from the database that matches the provided filter.first_filtered_by()
: Fetches the first record from the database that matches the provided filter.one_or_none_filtered_by()
: Fetches a single record or returnsNone
if no records match the provided filter.
The users.py
and api_keys.py
module contains a set of functions that perform CRUD operations using the classes defined in schema.py
. These functions use the class methods provided by the Mixin class to interact with the database.
Some of the functions in this module include:
create_api_key()
: Creates a new API key for a user.get_api_keys()
: Retrieves all API keys for a user.get_api_key_owner()
: Retrieves the owner of an API key.get_api_key_and_owner()
: Retrieves an API key and its owner.update_api_key()
: Updates an API key.delete_api_key()
: Deletes an API key.is_email_exist()
: Checks if an email exists in the database.get_me()
: Retrieves user information based on user ID.is_valid_api_key()
: Checks if an API key is valid.register_new_user()
: Registers a new user in the database.find_matched_user()
: Finds a user with a matching email in the database.
To use the provided CRUD operations, import the relevant functions from the crud.py
module and call them with the required parameters. For example:
import asyncio
from app.database.crud.users import register_new_user, get_me, is_email_exist
from app.database.crud.api_keys import create_api_key, get_api_keys, update_api_key, delete_api_key
async def main():
# `user_id` is an integer index in the MySQL database, and `email` is user's actual name
# the email will be used as `user_id` in ChatGpt. Don't confuse with `user_id` in MySQL
# Register a new user
new_user = await register_new_user(email="[email protected]", hashed_password="...")
# Get user information
user = await get_me(user_id=1)
# Check if an email exists in the database
email_exists = await is_email_exist(email="[email protected]")
# Create a new API key for user with ID 1
new_api_key = await create_api_key(user_id=1, additional_key_info={"user_memo": "Test API Key"})
# Get all API keys for user with ID 1
api_keys = await get_api_keys(user_id=1)
# Update the first API key in the list
updated_api_key = await update_api_key(updated_key_info={"user_memo": "Updated Test API Key"}, access_key_id=api_keys[0].id, user_id=1)
# Delete the first API key in the list
await delete_api_key(access_key_id=api_keys[0].id, access_key=api_keys[0].access_key, user_id=1)
if __name__ == "__main__":
asyncio.run(main())