Activity 1.6: Building a Simple RAG System

Work in progress

This section is under construction. This information hasn’t been reviewed or edited yet!


Practical Activity Overview

In this activity, you’ll build a Retrieval Augmented Generation (RAG) system that can answer questions based on your own documents. RAG combines the power of retrieval systems with language models to generate accurate, context-rich responses by:

  1. Retrieving relevant information from your documents
  2. Using that information to augment the language model’s knowledge
  3. Generating responses grounded in your specific data

This implementation will follow a simple but effective architecture with two main components:

  • Indexing Pipeline: Process documents, split them into chunks, and create vector embeddings
  • Retrieval & Generation: Search for relevant content and use it to generate answers

By the end of this activity, you’ll have a functional RAG system that can answer questions about your own documents.

New Code to Add

1. Import Required Libraries

import os
import streamlit as st
import requests
import tiktoken
import chromadb
from chonkie import SentenceChunker
from google import genai
from google.genai import types
from dotenv import load_dotenv
from pathlib import Path

2. Add Document Processing Functions

def process_documents(file_paths, chunk_size=5, chunk_overlap=1):
    """Process documents using Chonkie's SentenceChunker"""
    # Initialize chunker
    chunker = SentenceChunker(
        chunk_size=chunk_size,        # Number of sentences per chunk
        chunk_overlap=chunk_overlap   # Sentences overlapping between chunks
    )
    
    documents = []
    chunks = []
    
    for file_path in file_paths:
        with open(file_path, 'r', encoding='utf-8') as file:
            content = file.read()
            # Add document to collection
            doc_id = os.path.basename(file_path)
            documents.append({
                "id": doc_id,
                "content": content,
                "metadata": {"source": str(file_path)}
            })
            
            # Chunk the document
            doc_chunks = chunker.chunk(content)
            
            # Store chunks with metadata
            for i, chunk in enumerate(doc_chunks):
                chunks.append({
                    "id": f"{doc_id}-chunk-{i}",
                    "content": chunk.text,
                    "metadata": {"source": str(file_path), "chunk": i}
                })
    
    return chunks

3. Add Vector Store Functions

def create_vector_store(chunks):
    """Create a ChromaDB collection and add document chunks"""
    # Initialize ChromaDB client
    client = chromadb.Client()
    
    # Create a collection
    collection = client.create_collection("documents")
    
    # Add chunks to collection
    ids = [chunk["id"] for chunk in chunks]
    contents = [chunk["content"] for chunk in chunks]
    metadatas = [chunk["metadata"] for chunk in chunks]
    
    collection.add(
        ids=ids,
        documents=contents,
        metadatas=metadatas
    )
    
    return collection

def rag_query(collection, query, n_results=3):
    """Query the vector store and return relevant document chunks"""
    results = collection.query(
        query_texts=[query],
        n_results=n_results
    )
    
    return results

4. Add RAG Response Generation

def generate_with_context(query, context, model_type, temperature=0.7, top_p=0.9):
    """Generate a response using retrieved context"""
    # Create a prompt that includes the context
    prompt = f"""Answer the question based on the following context:

Context:
{context}

Question: {query}

Answer:"""
    
    # Generate response
    if model_type == "Ollama":
        response = generate_ollama_response([{"role": "user", "content": prompt}], 
                                          temperature=temperature, top_p=top_p)
    else:
        response = generate_gemini_response([{"role": "user", "content": prompt}],
                                          temperature=temperature, top_p=top_p)
    
    return response

5. Update UI with RAG Controls

# Add RAG controls
st.sidebar.subheader("RAG Settings")
use_rag = st.sidebar.checkbox("Use RAG", value=True, 
                            help="Enable Retrieval-Augmented Generation")
n_results = st.sidebar.slider("Number of chunks to retrieve", 1, 5, 3,
                            help="Number of document chunks to retrieve for context")

# Chonkie chunking settings
chunk_size = st.sidebar.slider("Sentences per chunk", 1, 10, 5, 
                             help="Number of sentences per chunk")
chunk_overlap = st.sidebar.slider("Sentence overlap", 0, 5, 1,
                                help="Number of sentences overlapping between chunks")

# File uploader for documents
st.sidebar.subheader("Document Ingestion")
uploaded_files = st.sidebar.file_uploader("Upload documents", 
                                        accept_multiple_files=True, 
                                        type=["txt", "md", "pdf"])

6. Implement Document Processing and Comparison

if uploaded_files and st.sidebar.button("Process Documents"):
    # Save uploaded files temporarily
    temp_dir = Path("temp_docs")
    temp_dir.mkdir(exist_ok=True)
    
    file_paths = []
    for uploaded_file in uploaded_files:
        file_path = temp_dir / uploaded_file.name
        with open(file_path, "wb") as f:
            f.write(uploaded_file.getbuffer())
        file_paths.append(file_path)
    
    # Process documents
    with st.spinner("Processing documents..."):
        chunks = process_documents(file_paths, chunk_size, chunk_overlap)
        st.session_state.vector_store = create_vector_store(chunks)
    
    st.sidebar.success(f"Processed {len(chunks)} chunks from {len(file_paths)} documents")

# Add a compare button to test with and without RAG
if st.sidebar.button("Compare RAG vs. Base Model"):
    if "user_input" in st.session_state and st.session_state.user_input and st.session_state.vector_store:
        query = st.session_state.user_input
        
        st.subheader("Comparison: RAG vs. Base Model")
        
        col1, col2 = st.columns(2)
        
        with col1:
            st.markdown("**With RAG**")
            
            # Get relevant documents
            results = rag_query(st.session_state.vector_store, query, n_results)
            context = "\n\n".join(results["documents"][0])
            
            with st.spinner("Generating RAG response..."):
                rag_response = generate_with_context(query, context, model_type, temperature, top_p)
            
            st.text_area("Response", rag_response, height=300)
            
        with col2:
            st.markdown("**Base Model Only**")
            
            with st.spinner("Generating base response..."):
                if model_type == "Ollama":
                    base_response = generate_ollama_response([{"role": "user", "content": query}], 
                                                          temperature=temperature, top_p=top_p)
                else:
                    base_response = generate_gemini_response([{"role": "user", "content": query}],
                                                          temperature=temperature, top_p=top_p)
            
            st.text_area("Response", base_response, height=300)

7. Update Chat Input Handling for RAG

# Chat input
user_input = st.chat_input("Type your message here...")
if user_input:
    # Store the user input
    st.session_state.user_input = user_input
    
    # Add user message to history
    st.session_state.messages.append({"role": "user", "content": user_input})
    
    # Display user message
    with st.chat_message("user"):
        st.write(user_input)
    
    try:
        # Generate response
        if use_rag and st.session_state.vector_store:
            # Get relevant documents
            results = rag_query(st.session_state.vector_store, user_input, n_results)
            context = "\n\n".join(results["documents"][0])
            
            # Generate response with context
            response = generate_with_context(user_input, context, model_type, temperature, top_p)
        else:
            # Generate response without RAG
            if model_type == "Ollama":
                response = generate_ollama_response([{"role": "user", "content": user_input}], 
                                                  temperature=temperature, top_p=top_p)
            else:
                response = generate_gemini_response([{"role": "user", "content": user_input}],
                                                  temperature=temperature, top_p=top_p)
        
        # Add assistant response to history
        st.session_state.messages.append({"role": "assistant", "content": response})
        
        # Display assistant response
        with st.chat_message("assistant"):
            st.write(response)
    except Exception as e:
        st.error(str(e))

What to Test

  1. Document Ingestion:

    • Upload several text documents with information that your model wouldn’t know about
    • Test different chunk sizes and overlaps to see their impact on retrieval quality
  2. Query Retrieval:

    • Ask questions directly related to your documents
    • Observe which chunks are being retrieved for different queries
  3. RAG vs. Base Model Comparison:

    • Use the comparison feature to see how RAG improves model responses
    • Ask questions the model might hallucinate about without proper context
    • Test questions about specific details in your documents
  4. Response Quality:

    • Evaluate accuracy of answers against the source documents
    • Check for factual correctness and relevance of information
    • Note any hallucinations or incorrect information
  5. Performance Factors:

    • Experiment with different numbers of chunks to retrieve
    • Test how changing the chunk size affects response quality

Key Learning Outcomes

After completing this activity, you should understand:

  1. The RAG Architecture: How retrieval and generation components work together to produce better responses

  2. Document Processing: How to prepare documents for RAG by chunking them into manageable pieces

  3. Vector Stores: How vector databases enable semantic search of document chunks

  4. Context Augmentation: How retrieved information improves LLM outputs by providing relevant context

  5. Comparison Analysis: How to evaluate the difference between base LLM responses and RAG-enhanced responses

  6. Practical Implementation: How to build a functional RAG system with minimal dependencies

  7. Customization Techniques: How parameters like chunk size, overlap, and number of retrieved chunks impact system performance

This activity provides a foundation for more advanced RAG techniques and demonstrates the practical benefits of augmenting language models with retrieval capabilities.

Complete Script

import os
import streamlit as st
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_community.llms import Ollama
from langchain.memory import ConversationBufferMemory
from langchain.chains import ConversationChain
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.messages import SystemMessage, HumanMessage
from langchain_community.document_loaders import TextLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import Chroma
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain.retrievers.multi_query import MultiQueryRetriever
from langchain.chains import RetrievalQA
from dotenv import load_dotenv
from pathlib import Path
import tempfile

# Load environment variables
load_dotenv()

# Check for API key
if not os.getenv('GOOGLE_API_KEY'):
    st.error("Please set your Google API Key in the .env file!")
    st.stop()

st.title("RAG-Enhanced Prompt Engineering Lab")
model_type = st.sidebar.selectbox("Model", ["Gemini", "Ollama"])

# Initialize session state
if "messages" not in st.session_state:
    st.session_state.messages = []
if "user_input" not in st.session_state:
    st.session_state.user_input = ""
if "vector_store" not in st.session_state:
    st.session_state.vector_store = None

# Add parameter controls to sidebar
st.sidebar.subheader("Generation Parameters")
temperature = st.sidebar.slider("Temperature", 0.0, 1.0, 0.7, 0.1)
top_p = st.sidebar.slider("Top-P", 0.0, 1.0, 0.9, 0.1)
context_window = st.sidebar.slider("Context Window Size", 1, 10, 5)

# RAG settings
st.sidebar.subheader("RAG Settings")
use_rag = st.sidebar.checkbox("Use RAG", value=True)
k_retrieval = st.sidebar.slider("Number of chunks to retrieve", 1, 5, 3)
chunk_size = st.sidebar.slider("Chunk Size", 100, 1000, 500)
chunk_overlap = st.sidebar.slider("Chunk Overlap", 0, 200, 50)

# Prompt technique library
PROMPT_TECHNIQUES = {
    "Basic": lambda query: query,
    "Few-Shot Learning": lambda query: f"""Here are some examples of how to respond:
Question: What is the capital of France?
Answer: The capital of France is Paris.
Question: What is the boiling point of water?
Answer: Water boils at 100 degrees Celsius at standard pressure.
Now answer this question: {query}""",
    "Chain-of-Thought": lambda query: f"""Think through this step-by-step to solve the problem:
{query}
Let's break this down into steps:
1. """,
    "Self-Consistency": lambda query: f"""Generate three different approaches to answer this question, then select the best one:
{query}
Approach 1:"""
}

# Prompt template library
PROMPT_TEMPLATES = {
    "Summarize Text": "Summarize the following text in 3-5 bullet points: {input}",
    "Explain Concept": "Explain {input} in simple terms a high school student would understand.",
    "Compare and Contrast": "Compare and contrast {input}. Highlight key similarities and differences.",
    "Generate Ideas": "Generate 5 creative ideas related to {input}.",
    "Analyze Argument": "Analyze the following argument and identify any logical fallacies: {input}"
}

# Add prompt technique selector
prompt_technique = st.sidebar.selectbox("Prompt Technique", list(PROMPT_TECHNIQUES.keys()))

# Add prompt template library
with st.sidebar.expander("Prompt Template Library"):
    selected_template = st.selectbox("Select Template", list(PROMPT_TEMPLATES.keys()))
    if st.button("Apply Template"):
        if st.session_state.user_input:
            new_prompt = PROMPT_TEMPLATES[selected_template].replace("{input}", st.session_state.user_input)
            st.session_state.user_input = new_prompt
            st.experimental_rerun()

# Document processing
st.sidebar.subheader("Document Ingestion")
uploaded_files = st.sidebar.file_uploader("Upload documents", accept_multiple_files=True, type=["txt", "md", "pdf"])

if uploaded_files and st.sidebar.button("Process Documents"):
    with st.spinner("Processing documents..."):
        # Create a temporary directory for the files
        temp_dir = Path(tempfile.mkdtemp())
        
        # Save the uploaded files
        file_paths = []
        for uploaded_file in uploaded_files:
            file_path = temp_dir / uploaded_file.name
            with open(file_path, "wb") as f:
                f.write(uploaded_file.getbuffer())
            file_paths.append(file_path)
        
        # Load and split the documents
        documents = []
        for file_path in file_paths:
            loader = TextLoader(file_path)
            documents.extend(loader.load())
        
        # Split the documents into chunks
        text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap
        )
        chunks = text_splitter.split_documents(documents)
        
        # Create vector store
        embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")
        st.session_state.vector_store = Chroma.from_documents(chunks, embeddings)
        
        st.sidebar.success(f"Processed {len(chunks)} chunks from {len(file_paths)} documents")

# Display chat history
for message in st.session_state.messages:
    with st.chat_message(message["role"]):
        st.write(message["content"])

# Create a function to get the appropriate model with parameters
@st.cache_resource
def get_llm():
    if model_type == "Gemini":
        llm = ChatGoogleGenerativeAI(
            model="gemini-2.0-flash",
            temperature=temperature,
            top_p=top_p
        )
        max_tokens = 32768
    else:
        llm = Ollama(
            model="llama3.2:1b", 
            base_url="http://localhost:11434",
            temperature=temperature,
            top_p=top_p
        )
        max_tokens = 4096
    
    return llm, max_tokens

def get_conversation_chain():
    llm, _ = get_llm()
    
    # Create a system message template
    system_template = "You are a helpful AI assistant."
    
    # Create a chat prompt template with memory
    prompt = ChatPromptTemplate.from_messages([
        SystemMessage(content=system_template),
        MessagesPlaceholder(variable_name="history"),
        HumanMessage(content="{input}")
    ])
    
    # Initialize memory
    memory = ConversationBufferMemory(k=context_window, return_messages=True, memory_key="history")
    
    # Load existing conversation into memory
    for msg in st.session_state.messages[-context_window:]:
        if msg["role"] == "user":
            memory.chat_memory.add_user_message(msg["content"])
        else:
            memory.chat_memory.add_ai_message(msg["content"])
    
    # Create the conversation chain
    chain = ConversationChain(llm=llm, prompt=prompt, memory=memory, verbose=False)
    
    return chain

def get_rag_chain():
    llm, _ = get_llm()
    
    # Create retriever
    retriever = st.session_state.vector_store.as_retriever(search_kwargs={"k": k_retrieval})
    
    # Create RAG chain
    rag_chain = RetrievalQA.from_chain_type(
        llm=llm,
        chain_type="stuff",
        retriever=retriever,
        return_source_documents=True
    )
    
    return rag_chain

# Add token counting display
if st.session_state.messages:
    _, max_tokens = get_llm()
    total_tokens = sum(len(msg["content"].split()) * 1.3 for msg in st.session_state.messages)
    st.sidebar.metric("Estimated Tokens Used", int(total_tokens))
    st.sidebar.progress(min(1.0, total_tokens / max_tokens))
    st.sidebar.text(f"{model_type} context: ~{max_tokens} tokens")

# Add a compare button for RAG vs non-RAG
if st.sidebar.button("Compare RAG vs. Base Model"):
    if st.session_state.user_input and st.session_state.vector_store:
        query = st.session_state.user_input
        modified_query = PROMPT_TECHNIQUES[prompt_technique](query)
        
        st.subheader("Comparison: RAG vs. Base Model")
        col1, col2 = st.columns(2)
        
        with col1:
            st.markdown("**With RAG**")
            with st.spinner("Generating RAG response..."):
                rag_chain = get_rag_chain()
                rag_response = rag_chain({"query": modified_query})
                st.text_area("Response", rag_response["result"], height=300)
        
        with col2:
            st.markdown("**Base Model Only**")
            with st.spinner("Generating base response..."):
                chain = get_conversation_chain()
                base_response = chain.invoke({"input": modified_query})
                st.text_area("Response", base_response["response"], height=300)
    else:
        st.error("Please enter a query and process documents first")

# Chat input
user_input = st.chat_input("Type your message here...")
if user_input:
    # Store the user input
    st.session_state.user_input = user_input
    
    # Apply selected prompt technique
    modified_input = PROMPT_TECHNIQUES[prompt_technique](user_input)
    
    # Add user message to history
    st.session_state.messages.append({"role": "user", "content": modified_input})
    
    # Display user message
    with st.chat_message("user"):
        st.write(modified_input)
    
    try:
        if use_rag and st.session_state.vector_store:
            # Use RAG chain
            rag_chain = get_rag_chain()
            response = rag_chain({"query": modified_input})
            assistant_response = response["result"]
        else:
            # Use conversation chain
            chain = get_conversation_chain()
            response = chain.invoke({"input": modified_input})
            assistant_response = response["response"]
        
        # Add assistant response to history
        st.session_state.messages.append({"role": "assistant", "content": assistant_response})
        
        # Display assistant response
        with st.chat_message("assistant"):
            st.write(assistant_response)
    except Exception as e:
        st.error(str(e))

# Add button to clear conversation history
if st.sidebar.button("Clear Conversation"):
    st.session_state.messages = []
    st.session_state.user_input = ""
    st.experimental_rerun()