15.8 C
London
Thursday, May 16, 2024

Creating an AI Utility to Chat with Large SQL Databases


Introduction

You may simply create a easy software that may chat with SQL Database. However right here’s the issue with that. You may’t make it work seamlessly with regards to dealing with and dealing with giant databases. If the database is big, it’s impractical to incorporate the entire checklist of columns and tables within the immediate context. This text explains methods to bypass this impediment by creating an AI software that may chat with huge SQL databases.

How to Create an AI Application That Can Chat with Massive SQL Databases

Creating an AI Utility to Chat with Large SQL Databases

The next code initiates a easy Streamlit software that allows customers to connect with an SQL database and chat with it.

import streamlit as st
import requests
import os
import pandas as pd
from uuid import uuid4
import psycopg2
from langchain.prompts import ChatPromptTemplate
from langchain.prompts.chat import SystemMessage, HumanMessagePromptTemplate
from langchain.llms import OpenAI, AzureOpenAI
from langchain.chat_models import ChatOpenAI, AzureChatOpenAI
from langchain.embeddings import OpenAIEmbeddings
from dotenv import load_dotenv

# Create vital folders
folders_to_create = ['csvs']
for folder_name in folders_to_create:
    if not os.path.exists(folder_name):
        os.makedirs(folder_name)
        print(f"Folder '{folder_name}' created.")
    else:
        print(f"Folder '{folder_name}' already exists.")

# Load the OpenAI API key
load_dotenv()
openai_api_key = os.getenv("OPENAI_API_KEY")
llm = OpenAI(openai_api_key=openai_api_key)
chat_llm = ChatOpenAI(openai_api_key=openai_api_key, temperature=0.4)
embeddings = OpenAIEmbeddings(openai_api_key=openai_api_key)

def get_basic_table_details(cursor):
    question = """
    SELECT table_name, column_name, data_type
    FROM information_schema.columns
    WHERE table_name IN (
        SELECT tablename FROM pg_tables WHERE schemaname="public"
    );"""
    cursor.execute(question)
    return cursor.fetchall()

def save_db_details(db_uri):
    unique_id = str(uuid4()).change("-", "_")
    connection = psycopg2.join(db_uri)
    cursor = connection.cursor()
    tables_and_columns = get_basic_table_details(cursor)
    df = pd.DataFrame(tables_and_columns, columns=['table_name', 'column_name', 'data_type'])
    filename_t = f'csvs/tables_{unique_id}.csv'
    df.to_csv(filename_t, index=False)
    cursor.shut()
    connection.shut()
    return unique_id

def generate_template_for_sql(question, table_info, db_uri):
    template = ChatPromptTemplate.from_messages([
        SystemMessage(content=f"You are an assistant that can write SQL Queries. Given the text below, write a SQL query that answers the user's question. DB connection string is {db_uri} Here is a detailed description of the table(s): {table_info} Prepend and append the SQL query with three backticks '```'"),
        HumanMessagePromptTemplate.from_template("{text}")
    ])
    reply = chat_llm(template.format_messages(textual content=question))
    return reply.content material

def get_the_output_from_llm(question, unique_id, db_uri):
    filename_t = f'csvs/tables_{unique_id}.csv'
    df = pd.read_csv(filename_t)
    table_info = ''
    for desk in df['table_name'].distinctive():
        table_info += f'Details about desk {desk}:n'
        table_info += df[df['table_name'] == desk].to_string(index=False) + 'nn'
    return generate_template_for_sql(question, table_info, db_uri)

def execute_the_solution(answer, db_uri):
    connection = psycopg2.join(db_uri)
    cursor = connection.cursor()
    _, final_query, _ = answer.break up("```")
    cursor.execute(final_query.strip())
    end result = cursor.fetchall()
    return str(end result)

def connect_with_db(uri):
    st.session_state.db_uri = uri
    st.session_state.unique_id = save_db_details(uri)
    return {"message": "Database connection established!"}

def send_message(message):
    answer = get_the_output_from_llm(message, st.session_state.unique_id, st.session_state.db_uri)
    end result = execute_the_solution(answer, st.session_state.db_uri)
    return {"message": answer + "nn" + "Consequence:n" + end result}

# Streamlit interface setup
st.subheader("Directions")
st.markdown("1. Enter your RDS Database URI beneath.n2. ")

The elemental technique for simplifying the immediate entails sending solely the related tables and column names that pertain to the consumer’s question. To attain this, we will generate embeddings for the desk and column names, dynamically retrieve probably the most pertinent ones primarily based on the consumer’s enter, and embody these within the immediate. On this article, we’ll make the most of ChromaDB as our vector database. Nevertheless, alternate options like Pinecone, Milvus, or any appropriate vector database can be utilized.

The right way to Simplify the Immediate

Let’s start by putting in ChromaDB.

pip set up chromadb

First, we’ll arrange an extra folder named ‘vectors’ alongside the ‘csvs’ folder to retailer embeddings of desk and column names. This folder can even include different pertinent database particulars, such because the international keys that hyperlink totally different tables, and potential values for the WHERE clause.

def generate_embeddings(filename, storage_folder):
    csv_loader = CSVLoader(file_path=filename, encoding="utf8")
    dataset = csv_loader.load()
    vector_database = Chroma.from_documents(dataset, embedding=embeddings, persist_directory=storage_folder)
    vector_database.persist()

We may even first verify whether or not the consumer’s question wants any details about tables or if the consumer is as an alternative asking about simply the overall schema of the database.

def check_user_intent_for_database_info_or_sql(question):
    # Outline a template for the dialog
    prompt_template = ChatPromptTemplate.from_messages([
        SystemMessage(
            content=(
                "Based on the provided text, the user is asking a question about databases. "
                "Determine if the user seeks information about the database schema or if they want to write a SQL query. "
                "Respond with 'yes' if the user is seeking information about the database schema and 'no' if they intend to write a SQL query."
            )
        ),
        HumanMessagePromptTemplate.from_template("{text}"),
    ])
    
    # Generate a response utilizing a language mannequin
    response = chat_llm(prompt_template.format_messages(textual content=question))
    print(response.content material)
    return response.content material

The consumer responds with both ‘sure’ or ‘no’. If the reply is ‘sure’, a immediate is generated.

def generate_sql_query_prompt(question, db_uri):
    # Configure the chat template with system and human messages
    prompt_template = ChatPromptTemplate.from_messages([
        SystemMessage(
            content=(
                "As an assistant tasked with writing SQL queries, create a SQL query based on the text below. "
                "Enclose the SQL query within three backticks '```' for clarity. "
                "Aim to use 'SELECT' queries as much as possible. "
                f"The connection string for the database is {db_uri}."
            )
        ),
        HumanMessagePromptTemplate.from_template("{text}"),
    ])
    
    # Generate and print the reply utilizing a language mannequin
    response = chat_llm.prompt_template.format_messages(textual content=question))
    print(response.content material)
    return response.content material

If the reply is ‘no’, it signifies that the consumer’s question particularly requires the names of tables and columns inside these tables. We’ll then determine probably the most related tables & columns, and assemble a string from these to incorporate in our immediate.

Subsequent, we’ll confirm that our vectors have been efficiently created and that each one different elements are functioning appropriately. Beneath is the entire code up thus far.

import streamlit as st
import requests
import os
import pandas as pd
from uuid import uuid4
import psycopg2

from langchain.prompts import ChatPromptTemplate
from langchain.prompts.chat import SystemMessage, HumanMessagePromptTemplate

from langchain.llms import OpenAI, AzureOpenAI
from langchain.chat_models import ChatOpenAI, AzureChatOpenAI
from langchain.embeddings import OpenAIEmbeddings
from dotenv import load_dotenv
from langchain.vectorstores import Chroma
from langchain.document_loaders.csv_loader import CSVLoader

# Create vital folders for information storage
folders_to_create = ['csvs', 'vectors']
for folder_name in folders_to_create:
    if not os.path.exists(folder_name):
        os.makedirs(folder_name)
        print(f"Folder '{folder_name}' created.")
    else:
        print(f"Folder '{folder_name}' already exists.")

# Load API key from surroundings variable
load_dotenv()
openai_api_key = os.getenv("OPENAI_API_KEY")

# Initialize language fashions and embeddings
llm = OpenAI(openai_api_key=openai_api_key)
chat_llm = ChatOpenAI(openai_api_key=openai_api_key, temperature=0.4)
embeddings = OpenAIEmbeddings(openai_api_key=openai_api_key)

# Perform to retrieve fundamental desk particulars from the database
def get_basic_table_details(cursor):
    cursor.execute("""
        SELECT c.table_name, c.column_name, c.data_type
        FROM information_schema.columns c
        WHERE c.table_name IN (
            SELECT tablename
            FROM pg_tables
            WHERE schemaname="public"
        );""")
    return cursor.fetchall()

# Perform to create vector databases from CSV recordsdata
def create_vectors(filename, persist_directory):
    loader = CSVLoader(file_path=filename, encoding="utf8")
    information = loader.load()
    vectordb = Chroma.from_documents(information, embedding=embeddings, persist_directory=persist_directory)
    vectordb.persist()

# Perform to save lots of database particulars and generate vectors
def save_db_details(db_uri):
    unique_id = str(uuid4()).change("-", "_")
    connection = psycopg2.join(db_uri)
    cursor = connection.cursor()
    tables_and_columns = get_basic_table_details(cursor)
    df = pd.DataFrame(tables_and_columns, columns=['table_name', 'column_name', 'data_type'])
    filename_t="csvs/tables_" + unique_id + '.csv'
    df.to_csv(filename_t, index=False)
    create_vectors(filename_t, "./vectors/tables_"+ unique_id)
    cursor.shut()
    connection.shut()
    return unique_id

# Perform to generate SQL question templates
def generate_template_for_sql(question, table_info, db_uri):
    template = ChatPromptTemplate.from_messages([
        SystemMessage(
            content=(
                f"You are an assistant that can write SQL Queries. Given the text below, write a SQL query that answers the user's question. DB connection string is {db_uri}. Here is a detailed description of the table(s): {table_info} Prepend and append the SQL query with three backticks '```'"
            )
        ),
        HumanMessagePromptTemplate.from_template("{text}"),
    ])
    return chat_llm(template.format_messages(textual content=question)).content material

# Perform to find out if consumer's question is about normal schema data or SQL
def check_if_users_query_want_general_schema_information_or_sql(question):
    template = ChatPromptTemplate.from_messages([
        SystemMessage(
            content=(
                f"In the given text, the user is asking a question about the database. Determine whether the user wants information about the database schema or wants to write a SQL query. Answer 'yes' for schema information and 'no' for SQL query."
            )
        ),
        HumanMessagePromptTemplate.from_template("{text}"),
    ])
    reply = chat_llm(template.format_messages(textual content=question))
    print(reply.content material)
    return reply.content material

# Perform to immediate when consumer needs normal database data
def prompt_when_user_want_general_db_information(question, db_uri):
    template = ChatPromptTemplate.from_messages([
        SystemMessage(
            content=(
                "You are an assistant who writes SQL queries. Given the text below, write a SQL query that answers the user's question. Prepend and append the SQL query with three backticks '```' Write select query whenever possible Connection string to this database is {db_uri}"
            )
        ),
        HumanMessagePromptTemplate.from_template("{text}"),
    ])
    reply = chat_llm(template.format_messages(textual content=question))
    print(reply.content material)
    return reply.content material

# Perform to course of consumer queries and generate outputs primarily based on whether or not it is about normal schema or particular SQL question
def get_the_output_from_llm(question, unique_id, db_uri):
    filename_t="csvs/tables_" + unique_id + '.csv'
    df = pd.read_csv(filename_t)
    table_info = ''
    for desk in df['table_name'].distinctive():
        table_info += 'Details about desk ' + desk + ':n'
        table_info += df[df['table_name'] == desk].to_string(index=False) + 'nnn'
    answer_to_question_general_schema = check_if_users_query_want_general_schema_information_or_sql(question)
    if answer_to_question_general_schema == "sure":
        return prompt_when_user_want_general_db_information(question, db_uri)
    return generate_template_for_sql(question, table_info, db_uri)

# Perform to execute SQL options
def execute_the_solution(answer, db_uri):
    connection = psycopg2.join(db_uri)
    cursor = connection.cursor()
    _, final_query, _ = answer.break up("```")
    final_query = final_query.strip('sql')
    cursor.execute(final_query)
    end result = cursor.fetchall()
    return str(end result)

# Streamlit app setup and interplay dealing with
if __name__ == "__main__":
    st.subheader("Directions")
    st.markdown("""
        1. Enter the URI of your RDS Database within the textual content field beneath.
        2. Click on the **Begin Chat** button to begin the chat.
        3. Enter your message within the textual content field beneath and press **Enter** to ship the message to the API.
    """)
    chat_history = []
    uri = st.text_input("Enter the RDS Database URI")
    if st.button("Begin Chat"):
        if not uri:
            st.warning("Please enter a legitimate database URI.")
        else:
            st.data("Connecting to the API and beginning the chat...")
            chat_response = connect_with_db(uri)
            if "error" in chat_response:
                st.error("Error: Failed to begin the chat. Please verify the URI and take a look at once more.")
            else:
                st.success("Chat began efficiently!")
    st.subheader("Chat with the API")
    if "messages" not in st.session_state:
        st.session_state.messages = []
    for message in st.session_state.messages:
        with st.chat_message(message["role"]):
            st.markdown(message["content"])
    if immediate := st.chat_input("What's up?"):
        st.chat_message("consumer").markdown(immediate)
        st.session_state.messages.append({"position": "consumer", "content material": immediate})
        response = send_message(immediate)["message"]
        with st.chat_message("assistant"):
            st.markdown(response)
        st.session_state.messages.append({"position": "assistant", "content material": response})
    st.write("It is a easy Streamlit app for beginning a chat with an RDS Database.")

Within the subsequent step, we are going to carry out vector retrieval to determine probably the most related tables. As soon as these tables are chosen, we are going to collect all of the column particulars to supply extra context for our immediate. We’ll then compile this data right into a string to incorporate within the immediate.

# Initialize the vector database for storing desk embeddings
vectordb = Chroma(embedding_function=embeddings, persist_directory=f"./vectors/tables_{unique_id}")
retriever = vectordb.as_retriever()
docs = retriever.get_relevant_documents(question)
print(docs)

# Amassing the related tables and their columns
relevant_tables = []
relevant_table_details = []

for doc in docs:
    table_info = doc.page_content.break up("n")
    table_name = table_info[0].break up(":")[1].strip()
    column_name = table_info[1].break up(":")[1].strip()
    data_type = table_info[2].break up(":")[1].strip()
    relevant_tables.append(table_name)
    relevant_table_details.append((table_name, column_name, data_type))

# Load information about all tables from a CSV file
filename_t = f'csvs/tables_{unique_id}.csv'
df = pd.read_csv(filename_t)

# Assemble a descriptive string for every related desk, together with all columns and their information varieties
table_info = ''
for desk in relevant_tables:
    table_info += f'Details about desk {desk}:n'
    table_info += df[df['table_name'] == desk].to_string(index=False) + 'nnn'

def create_sql_query_template(question, relevant_tables, table_info):
    tables_list = ",".be a part of(relevant_tables)
    chat_template = ChatPromptTemplate.from_messages([
        SystemMessage(
            content=(
                f"As an assistant capable of composing SQL queries, please write a query that resolves the user's inquiry based on the text provided. "
                f"Consider SQL tables named '{tables_list}'. "
                f"Below is a detailed description of these table(s): "
                f"{table_info}"
                "Enclose the SQL query within three backticks '```' for proper formatting."
            )
        ),
        HumanMessagePromptTemplate.from_template("{text}"),
    ])
    
    response = chat_llm(chat_template.format_messages(textual content=question))
    print(response.content material)
    return response.content material

One last factor we will do is to provide details about international keys to the immediate.

import streamlit as st
import os
import pandas as pd
from uuid import uuid4
import psycopg2

from langchain.prompts import ChatPromptTemplate
from langchain.prompts.chat import SystemMessage, HumanMessagePromptTemplate
from langchain.llms import OpenAI
from langchain.chat_models import ChatOpenAI
from langchain.embeddings import OpenAIEmbeddings
from dotenv import load_dotenv
from langchain.vectorstores import Chroma
from langchain.document_loaders.csv_loader import CSVLoader

# Guarantee vital directories exist
folders_to_create = ['csvs', 'vectors']
for folder in folders_to_create:
    os.makedirs(folder, exist_ok=True)
    print(f"Listing '{folder}' checked or created.")

# Load surroundings and API keys
load_dotenv()
openai_api_key = os.getenv("OPENAI_API_KEY")

# Initialize language fashions and embeddings
language_model = OpenAI(openai_api_key=openai_api_key)
chat_language_model = ChatOpenAI(openai_api_key=openai_api_key, temperature=0.4)
embeddings = OpenAIEmbeddings(openai_api_key=openai_api_key)

def fetch_table_details(cursor):
    sql = """
        SELECT table_name, column_name, data_type
        FROM information_schema.columns
        WHERE table_schema="public";
    """
    cursor.execute(sql)
    return cursor.fetchall()

def fetch_foreign_key_details(cursor):
    sql = """
        SELECT conrelid::regclass AS table_name, conname AS foreign_key,
               pg_get_constraintdef(oid) AS constraint_definition
        FROM pg_constraint
        WHERE contype="f" AND connamespace="public"::regnamespace;
    """
    cursor.execute(sql)
    return cursor.fetchall()

def create_vector_database(information, listing):
    loader = CSVLoader(information=information, encoding="utf8")
    document_data = loader.load()
    vector_db = Chroma(embeddings, persist_directory=listing)
    vector_db.from_documents(document_data)
    vector_db.persist()

def save_database_details(uri):
    unique_id = str(uuid4()).change("-", "_")
    conn = psycopg2.join(uri)
    cur = conn.cursor()
    particulars = fetch_table_details(cur)
    df = pd.DataFrame(particulars, columns=['table_name', 'column_name', 'data_type'])
    csv_path = f'csvs/tables_{unique_id}.csv'
    df.to_csv(csv_path, index=False)
    create_vector_database(df, f"./vectors/tables_{unique_id}")
    
    foreign_keys = fetch_foreign_key_details(cur)
    fk_df = pd.DataFrame(foreign_keys, columns=['table_name', 'foreign_key', 'constraint_definition'])
    fk_csv_path = f'csvs/foreign_keys_{unique_id}.csv'
    fk_df.to_csv(fk_csv_path, index=False)
    
    cur.shut()
    conn.shut()
    return unique_id

def generate_sql_query_template(question, db_uri):
    template = ChatPromptTemplate.from_messages([
        SystemMessage(
            content=(
                f"You are an assistant capable of composing SQL queries. Use the details provided to write a relevant SQL query for the question below. DB connection string is {db_uri}."
                "Enclose the SQL query with three backticks '```'."
            )
        ),
        HumanMessagePromptTemplate.from_template("{text}"),
    ])
    response = chat_language_model(template.format_messages(textual content=question))
    return response.content material

# Streamlit software setup
st.title("Database Interplay Device")
uri = st.text_input("Enter the RDS Database URI")
if st.button("Hook up with Database"):
    if uri:
        strive:
            unique_id = save_database_details(uri)
            st.success(f"Linked to database and information saved with ID: {unique_id}")
        besides Exception as e:
            st.error(f"Failed to attach: {str(e)}")
    else:
        st.warning("Please enter a legitimate database URI.")

In comparable methods, we will preserve enhancing this software by including fallbacks. In every fallback, we will preserve including extra data.

Conclusion

This text presents a novel method to growing an AI software able to seamlessly interacting with huge SQL databases by chat. Now we have addressed the problem of dealing with giant databases the place it’s impractical to incorporate the entire checklist of columns and tables within the immediate. Our proposed answer dynamically retrieves related desk and column names primarily based on consumer queries. We make sure the immediate consists of solely pertinent data, enhancing consumer expertise and effectivity. That is finished by leveraging vector databases like ChromaDB for embedding era and retrieval.

Now we have demonstrated methods to streamline the interplay course of by step-by-step implementation and code examples. In the meantime, we additionally labored on constantly enhancing the appliance’s performance. With additional enhancements resembling incorporating international keys and extra fallbacks, this software holds promise for numerous database interplay situations.

Latest news
Related news

LEAVE A REPLY

Please enter your comment!
Please enter your name here