-
A Simple RAG Pipeline
-
Rewriting the query
-
Inducing Self-Reflection
-
Providing feedback for generation retry
-
Limiting the number of iterations
-
Filtering the documents and extracting knowledge
-
Adaptive RAG
-
Corrective RAG
I have had a lot of resentment in the past months with the current state of LangChain. That library evolved very fast, and I shifted my feelings about it from “I love it“ to “I hate it“. I find their LangChain Expression Language (LCEL) unusable for most cases going beyond a basic chain, and their constant change in the API and documentation makes it impossible to understand how to use it properly.
When LangGraph came out by the same guys, I found the idea great, but I was scared to find myself back in a similar hellhole of an ever-changing framework going downhill. Now, being a bit more experienced with it, I find it to be one of the best tools on the market. It is not perfect, but it gave me the means to build complex stuff with ease, and I am recovering this long-lost feeling of pleasure that I once experienced with LangChain.
LangChain does too much, and as a consequence, it does many things badly. Scaling beyond the basic use cases with LangChain is a challenge that is often better served with building things from scratch by using the underlying APIs. LangGraph is simpler and, in many ways, can be used as a replacement for LCEL. The idea is to capture the orchestration of LLM calls into a Directed Graph structure. This is a very common strategy for orchestrating complex systems. It is the underlying principle behind KubeFlow for building ML pipelines on Kubernetes and AirFlow for building Data Engineering pipelines. Being able to focus only on the mechanism of a specific node in the graph at the time allows us to abstract away the complexity of the whole pipeline and engineer systems that can scale in complexity without making it harder on the engineers.
Let’s see an example of how we can move from a simple pipeline to a complex one with ease with LangGraph. You can find the code used in this article here.
A Simple RAG Pipeline example, I like to use the Retrieval Augmented Generation (RAG) pipeline as it is an easy pipeline to mess up. In its simplest form, RAG is composed of a retriever and a generator:
-
The user question is used to retrieve the data that could be used as context to answer the question better. The data is typically in a database, such as a vector, SQL, or graph database.
-
The retrieved data is used as context in a prompt for an LLM to answer the question.
The Data
from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings
from langchain_community.document_loaders import GitLoader
from langchain_text_splitters import (
Language,
RecursiveCharacterTextSplitter,
)
loader = GitLoader(
clone_url="https://github.com/langchain-ai/langchain",
repo_path="./code_data/langchain_repo/",
branch="master",
)
python_splitter = RecursiveCharacterTextSplitter.from_language(
language=Language.PYTHON, chunk_size=10000, chunk_overlap=100
)
docs = loader.load()
docs = [doc for doc in docs if len(doc.page_content) < 50000]
I chunk the code into chunks of 10000 characters maximum, and I remove any documents with more than 50000 characters. Then, I index the data into a local Chroma database:
vectorstore = Chroma(
collection_name="rag-chroma",
embedding_function=OpenAIEmbeddings(),
persist_directory="./chroma_langchain_db",
)
vectorstore.add_documents(documents=docs)
retriever = vectorstore.as_retriever()
Here, I persist the data in a local folder, so I don’t have to reindex the data every time I run the code.
The Pipeline state
The idea behind LangGraph is to ensure that we keep track of the state of the pipeline. For a simple RAG pipeline, the state is captured by the user question, the retrieved documents, and the LLM generation. We capture the state as a Pydantic data model. Pydantic is a data validation library for Python:
from typing import List, Optional
from langchain_core.pydantic_v1 import BaseModel
class GraphState(BaseModel):
question: Optional[str] = None
generation: Optional[str] = None
documents: List[str] = []
The Retrieval node
def retriever_node(state: GraphState):
new_documents = retriever.invoke(state.question)
new_documents = [d.page_content for d in new_documents]
state.documents.extend(new_documents)
return {"documents": state.documents}
The retriever node does the following:
-
It uses the question stored in the state as a query to the retriever.
-
The retrieved documents are passed to the pipeline state.
The Generation node
from langchain_core.prompts import ChatPromptTemplate
system_prompt = """ You are an assistant for question-answering tasks. Use the following pieces of retrieved context to answer the question. If you don't know the answer, just say that you don't know. Use three sentences maximum and keep the answer concise. Only provide the answer and nothing else! """ human_prompt = """ Question: {question} Context: {context} Answer: """ rag_prompt = ChatPromptTemplate.from_messages( [ ("system", system_prompt), ("human", human_prompt), ] )
Let’s bind this prompt to the gpt-4o-mini model by OpenAI (because it is cheap) as a chain:
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser
llm_engine = ChatOpenAI(model='gpt-4o-mini')
rag_chain = rag_prompt | llm_engine | StrOutputParser()
Now, let’s pass this chain to the generation node:
def generation_node(state: GraphState):
generation = rag_chain.invoke({
"context": "\n\n".join(state.documents),
"question": state.question,
})
return {"generation": generation}
The generation node does the following:
-
It uses the question and the documents stored in the state to pass to the generation chain.
-
The generation chain generates an answer, and we store it in the state.
Building the pipeline
StateGraph
class:from langgraph.graph import END, StateGraph, START
pipeline = StateGraph(GraphState)
Then, we add the nodes:
pipeline.add_node('retrieval_node', retriever_node)
pipeline.add_node('generator_node', generation_node)
And we connect the nodes:
# We start by the retrieval
pipeline.add_edge(START, 'retrieval_node')
# We continue to the generation node
pipeline.add_edge('retrieval_node', 'generator_node')
# Once we generated the text, we end the pipeline
pipeline.add_edge('generator_node', END)
To be able to use the pipeline, we need to compile it:
rag_pipeline = pipeline.compile()
We can visualize the current pipeline:
from IPython.display import Image, display
display(Image(rag_pipeline.get_graph().draw_png()))
We can query that pipeline and stream the different events as they are being executed:
inputs = {"question": "What is LangChain?"}
for output in rag_pipeline.stream(inputs, stream_mode='updates'):
for key in output.keys():
print(f"Node: {key}")
print(value["generation"])
> Node: retrieval
> Node: generator
LangChain is a framework designed for developing applications powered by large language models (LLMs). It simplifies the entire application lifecycle, including development, productionization, and deployment, through a set of open-source libraries and tools. Key components of LangChain include model I/O, retrieval strategies, and agents, enabling the creation of context-aware reasoning applications.
Rewriting the query
Here, I am just going to ask the LLM to rewrite a better question for retrieval.
system_prompt = """
You a question re-writer that converts an input question to a better version that is optimized for vectorstore retrieval.
The vectorstore contains the whole GitHub repository of the LangChain Python Package. Look at the input and try to reason about the underlying semantic intent / meaning.
Only respond with the rewritten question and nothing else!
"""
human_prompt = """
Here is the initial question: {question}
Formulate an improved question.
Rewritten question:
"""
db_query_rewrite_prompt = ChatPromptTemplate.from_messages(
[
("system", system_prompt),
("human", human_prompt),
]
)
Let’s get the related chain:
db_query_rewriter = (
db_query_rewrite_prompt
| llm_engine
| StrOutputParser()
)
I am adding a new state attribute to capture the rewritten question:
class GraphState(BaseModel):
...
rewritten_question: Optional[str] = None
We implement the new node by invoking the db_query_rewriter
chain:
def db_query_rewriting_node(state: GraphState):
rewritten_question = db_query_rewriter.invoke({
"question": state.question
})
return {"rewritten_question": rewritten_question}
And now the retrieval is done with the rewritten question instead:
def retriever_node(state: GraphState):
new_documents = retriever.invoke(state.rewritten_question)
new_documents = [d.page_content for d in new_documents]
state.documents.extend(new_documents)
return {"documents": state.documents}
We add the new node:
pipeline.add_node('db_query_rewrite_node', db_query_rewriting_node)
And change the connection between the nodes:
# We start by the retrieval
pipeline.add_edge(START, 'db_query_rewrite_node')
# We continue to the retrieval node
pipeline.add_edge('db_query_rewrite_node', 'retrieval_node')
The rest remains the same. Let’s draw the graph:
display(Image(rag_pipeline.get_graph().draw_png()))
Inducing Self-Reflection
Let’s create a prompt that will assess hallucination. We are defining “hallucination“ as the model generating an answer that is not grounded in the facts contained in the document:
system_prompt = """
You are a grader assessing whether an LLM generation is grounded in / supported by a set of retrieved facts.
Give a binary score 'yes' or 'no'. 'yes' means that the answer is grounded in / supported by the set of facts.
"""
human_prompt = """
Set of facts:
{documents}
LLM generation: {generation}
"""
hallucination_prompt = ChatPromptTemplate.from_messages(
[
("system", system_prompt),
("human", human_prompt),
]
)
We create another prompt to assess the relevance of the answer to the question:
system_prompt = """
You are a grader assessing whether an answer addresses / resolves a question.
Give a binary score 'yes' or 'no'. 'yes' means that the answer resolves the question.
"""
human_prompt = """
User question: {question}
LLM generation: {generation}
"""
answer_prompt = ChatPromptTemplate.from_messages(
[
("system", system_prompt),
("human", human_prompt),
]
)
To ensure that the output generated by the model can only be “yes“ or “no”, I enforce the output to follow the structure provided by a Pydantic data model:
from typing import Literal
from langchain_core.pydantic_v1 import BaseModel, Field
class GradeHallucinations(BaseModel):
binary_score: Literal["yes", "no"] = Field(
description="Answer is grounded in the facts, 'yes' or 'no'"
)
class GradeAnswer(BaseModel):
binary_score: Literal["yes", "no"] = Field(
description="Answer addresses the question, 'yes' or 'no'"
)
hallucination_grader = (
hallucination_prompt
| llm_engine.with_structured_output(GradeHallucinations)
)
answer_grader = (
answer_prompt
| llm_engine.with_structured_output(GradeAnswer)
)
This way, the output is directly parsed into a Pydantic model. For example:
grade = answer_grader.invoke(
{"question": "my question", "generation": "LLM answer"}
)
grade.binary_score
> 'yes'
Now, I can create an evaluation node to evaluate the answer:
def answer_evaluation_node(state: GraphState):
# assess hallucination
hallucination_grade = hallucination_grader.invoke(
{"documents": state.documents, "generation": state.generation}
)
if hallucination_grade.binary_score == "yes":
# if no hallucination, assess relevance
answer_grade = answer_grader.invoke({
"question": state.question,
"generation": state.generation
})
if answer_grade.binary_score == "yes":
# no hallucination and relevant
return "useful"
else:
# no hallucination but not relevant
return "not relevant"
else:
# we have hallucination
return "hallucination"
This evaluation node simply classifies the answer for hallucination and relevance:
We are going to use this evaluation node as a decision node to branch to different paths depending on the evaluation. We add a conditional edge to the pipeline:
pipeline.add_conditional_edges(
'generator',
answer_evaluation_node,
{
"useful": END,
"not relevant": 'db_query_rewrite_node',
"hallucination": 'generator_node'
}
)
If the answer is evaluated as “useful”, we return the answer to the user. If the answer is evaluated as “hallucination“, we regenerate. If the answer is evaluated as “not relevant“, we rewrite the query. Let’s draw the graph.
display(Image(rag_pipeline.get_graph().draw_png()))
Providing feedback for generation retry
To capture that feedback across multiple iterations, we are going to create two new state attributes:
class GraphState(BaseModel):
...
query_feedbacks: List[str] = []
generation_feedbacks: List[str] = []
They are lists of feedback that we can concatenate into a string when we need them into a prompt. Let’s create a prompt to generate feedback in case we get hallucinations:
system_prompt = """
Your role is to give feedback on a the LLM generated answer. The LLM generation is NOT grounded in the set of retrieved facts.
Explain how the generated answer could be improved so that it is only solely grounded in the retrieved facts.
Only provide your feedback and nothing else!
"""
human_prompt = """
User question: {question}
Retrieved facts:
{documents}
Wrong generated answer: {generation}
"""
generation_feedback_prompt = ChatPromptTemplate.from_messages(
[
("system", system_prompt),
("human", human_prompt),
]
)
Let’s create another prompt in the case the generation is not addressing the question:
system_prompt = """
Your role is to give feedback on a the text query used to retrieve documents. Those retrieved documents are used as context to answer a user question.
The following generated answer doesn't address the question! Explain how the query could be improved so that the retrieved documents could be more relevant to the question.
Only provide your feedback and nothing else!
"""
human_prompt = """
User question: {question}
Text query: {rewritten_question}
Retrieved documents:
{documents}
Wrong generated answer: {generation}
"""
query_feedback_prompt = ChatPromptTemplate.from_messages(
[
("system", system_prompt),
("human", human_prompt),
]
)
Let’s get the related chains:
query_feedback_chain = (
query_feedback_prompt
| llm_engine
| StrOutputParser()
)
generation_feedback_chain = (
generation_feedback_prompt
| llm_engine
| StrOutputParser()
)
We need to modify the db_query_rewrite_prompt
to account for that new information:
system_prompt = """
You a question re-writer that converts an input question to a better version that is optimized for vectorstore retrieval.
The vectorstore contains the whole GitHub repository of the LangChain Python Package. Look at the input and try to reason about the underlying semantic intent / meaning.
Additional feedback may be provided for why a previous version of the question didn't lead to a valid response. Make sure to utilize that feedback to generate a better question.
Only respond with the rewritten question and nothing else!
"""
human_prompt = """
Here is the initial question: {question}
Here is the feedback about previous versions of the question:
{feedback}
Formulate an improved question.
Rewritten question:
"""
db_query_rewrite_prompt = ChatPromptTemplate.from_messages(
[
("system", system_prompt),
("human", human_prompt),
]
)
Same modification for the RAG prompt:
system_prompt = """
You are an assistant for question-answering tasks. Use the following pieces of retrieved context to answer the question.
If you don't know the answer, just say that you don't know. Use three sentences maximum and keep the answer concise.
Additional feedback may be provided about a previous version of the answer. Make sure to utilize that feedback to improve the answer.
Only provide the answer and nothing else!
"""
human_prompt = """
Question: {question}
Context:
{context}
Here is the feedback about previous versions of the answer:
{feedback}
Answer:
"""
rag_prompt = ChatPromptTemplate.from_messages(
[
("system", system_prompt),
("human", human_prompt),
]
)
We now create the generation feedback node:
def generation_feedback_node(state: GraphState):
feedback = generation_feedback_chain.invoke({
"question": state.question,
"documents": "\n\n".join(state.documents),
"generation": state.generation
})
feedback = 'Feedback about the answer "{}": {}'.format(
state.generation, feedback
)
state.generation_feedbacks.append(feedback)
return {"generation_feedbacks": state.generation_feedbacks}
and the query feedback node:
def query_feedback_node(state: GraphState):
feedback = query_feedback_chain.invoke({
"question": state.question,
"rewritten_question": state.rewritten_question,
"documents": "\n\n".join(state.documents),
"generation": state.generation
})
feedback = 'Feedback about the query "{}": {}'.format(
state.rewritten_question, feedback
)
state.query_feedbacks.append(feedback)
return {"query_feedbacks": state.query_feedbacks}
We need to modify the db_rewitting_node
to account for the feedback:
def db_query_rewriting_node(state: GraphState):
rewritten_question = db_query_rewriter.invoke({
"question": state.question,
"feedback": "\n".join(state.query_feedbacks)
})
return {"rewritten_question": rewritten_question}
Same modification for the generation node:
def generation_node(state: GraphState):
generation = rag_chain.invoke({
"context": "\n\n".join(state.documents),
"question": state.question,
"feedback": "\n".join(state.generation_feedbacks)
})
return {"generation": generation}
We need now to add the new nodes to the graph and create the edges:
pipeline.add_node('query_feedback_node', query_feedback_node)
pipeline.add_node('generation_feedback_node', generation_feedback_node)
pipeline.add_conditional_edges(
'generator_node',
answer_evaluation_node,
{
"useful": END,
"not relevant": 'query_feedback_node',
"hallucination": 'generation_feedback_node'
}
)
pipeline.add_edge('query_feedback_node', 'db_query_rewrite_node')
pipeline.add_edge('generation_feedback_node', 'generator_node')
Let’s plot the resulting graph:
display(Image(rag_pipeline.get_graph().draw_png()))
Limiting the number of iterations
To implement this, I need to keep track of how many times we generated a response, so I am creating a new state attribute:
class GraphState(BaseModel):
...
generation_num: int = 0
Every time we reach the generation node, we increment this number:
def generation_node(state: GraphState):
generation = rag_chain.invoke({
"context": "\n\n".join(state.documents),
"question": state.question,
"feedback": "\n".join(state.generation_feedbacks)
})
return {
"generation": generation,
"generation_num": state.generation_num + 1
}
Now, we are going to modify the branching logic to check on the number of generations before making a decision:
MAX_GENERATIONS = 3
def answer_evaluation_node(state: GraphState):
# assess hallucination
hallucination_grade = hallucination_grader.invoke(
{"documents": state.documents, "generation": state.generation}
)
if hallucination_grade.binary_score == "yes":
# if no hallucination, assess relevance
answer_grade = answer_grader.invoke({
"question": state.question,
"generation": state.generation
})
if answer_grade.binary_score == "yes":
# no hallucination and relevant
return "useful"
elif state.generation_num > MAX_GENERATIONS:
return "max_generation_reached"
else:
# no hallucination but not relevant
return "not relevant"
elif state.generation_num > MAX_GENERATIONS:
return "max_generation_reached"
else:
# we have hallucination
return "hallucination"
This evaluation node now classifies the answer for hallucination, relevance, and max number of generations reached:
Once we reach the maximum number of iterations, we are going to apologize to the user and tell them we couldn't answer the question. Let’s create a prompt for that:
system_prompt = """
You job is to generate an apology for not being able to provide a correct answer to a user question.
The question were used to retrieve documents from a database and a websearch and none of them were able to provide enough context to answer the user question.
Explain to the user that you couldn't answer the question.
"""
give_up_prompt = ChatPromptTemplate.from_messages(
[
("system", system_prompt),
("human", "User question: {question} \n\n Answer:"),
]
)
We create its related chain:
give_up_chain = give_up_prompt | llm_engine | StrOutputParser()
And finally, its related node:
def give_up_node(state: GraphState):
response = give_up_chain.invoke(state.question)
return {"generation": response}
We add the node to the graph:
pipeline.add_node('give_up_node', give_up_node)
We add a new branch to the conditional edge:
pipeline.add_conditional_edges(
'generator_node',
answer_evaluation_node,
{
"useful": END,
"not relevant": 'query_feedback_node',
"hallucination": 'generation_feedback_node',
"max_generation_reached": 'give_up_node'
}
)
And we make the give_up_node an end state.
pipeline.add_edge('give_up_node', END)
Let’s plot the resulting graph:
display(Image(rag_pipeline.get_graph().draw_png()))
Filtering the documents and extracting knowledge
To assess the relevance of a document to the question, we are going to create a grader that will compare the document to the question. So here is the prompt:
system_prompt = """
You are a grader assessing relevance of a retrieved document to a user question.
It does not need to be a stringent test. The goal is to filter out erroneous retrievals.
If the document contains keyword(s) or semantic meaning related to the user question, grade it as relevant.
Give a binary score 'yes' or 'no' score to indicate whether the document is relevant to the question. 'yes' means that the document contains relevant information.
"""
human_prompt = """
Retrieved document: {document}
User question: {question}
"""
grade_doc_prompt = ChatPromptTemplate.from_messages(
[
("system", system_prompt),
("human", human_prompt),
]
)
And its chain:
class GradeDocuments(BaseModel):
binary_score: Literal["yes", "no"] = Field(
description="""
Document is relevant to the question, 'yes' or 'no'
"""
)
retrieval_grader = (
grade_doc_prompt
| llm_engine.with_structured_output(GradeDocuments)
)
Similarly, we are going to create a chain that can extract knowledge. First, the prompt:
system_prompt = """
You are a knowledge refinement engine. Your job is to extract the information from a document that could be relevant to a user question.
The goal is to filter out the noise and keep only the information that can provide context to answer the user question.
If the document contains keyword(s) or semantic meaning related to the user question, consider it as relevant.
DO NOT modify the text, only return the original text that is relevant to the user question.
"""
human_prompt = """
Retrieved document: {document}
User question: {question}
"""
knowledge_extraction_prompt = ChatPromptTemplate.from_messages(
[
("system", system_prompt),
("human", human_prompt),
]
)
And then its chain:
knowledge_extractor = (
knowledge_extraction_prompt
| llm_engine
| StrOutputParser()
)
Let’s create the node that will filter the documents:
def filter_relevant_documents_node(state: GraphState):
# first, we grade every documents
grades = retrieval_grader.batch([
{"question": state.question, "document": doc}
for doc in state.documents
])
# Then we keep only the documents that were graded as relevant
filtered_docs = [
doc for grade, doc
in zip(grades, state.documents)
if grade.binary_score == 'yes'
]
# If we didn't get any relevant document, let's capture that
# as a feedback for the next retrieval iteration
if not filtered_docs:
feedback = 'Feedback about the query "{}": did not generate any relevant documents.'.format(
state.rewritten_question
)
state.query_feedbacks.append(feedback)
return {
"documents": filtered_docs,
"query_feedbacks": state.query_feedbacks
}
-
We grade all the documents in batch. The batch operation will allow us to iterate through all the documents very fast.
-
We then filtered the documents that were not considered relevant.
-
If the list of filtered documents is empty, we capture feedback for the query rewriting that the query didn’t lead to any relevant document.
-
In the end, we update the state.
Let’s now extract the relevant knowledge from each document:
def knowledge_extractor_node(state: GraphState):
filtered_docs = knowledge_extractor.batch([
{"question": state.question, "document": doc}
for doc in state.documents
])
# we keep only the non empty documents
filtered_docs = [doc for doc in filtered_docs if doc]
return {"documents": filtered_docs}
We add those nodes to the graph:
pipeline.add_node('filter_docs_node', filter_relevant_documents_node)
pipeline.add_node('extract_knowledge_node', knowledge_extractor_node)
And we modify the edges:
pipeline.add_edge('retrieval_node', 'filter_docs_node')
pipeline.add_edge('filter_docs_node', 'extract_knowledge_node')
pipeline.add_edge('extract_knowledge_node', 'generator_node')
Let’s draw the graph:
display(Image(rag_pipeline.get_graph().draw_png()))
Adaptive RAG
-
If the question can be answered directly, we are going to route it to a simple LLM
-
If the question is about the data (the Langchain repo in our case), we are going to route it to the RAG pipeline
-
If the question would need additional context to be answered, we are going to route the question to a web search instead.
Since the search can now be through a database or web search, we need to route the query feedback to the right search mode:
For web search, I am going to use Tavily. Before I can use it, I need to get the API key:
I am going to use the integrated TavilySearchResultTool available as part of LangChain:
import os
from langchain_community.tools.tavily_search import TavilySearchResults
os.environ['TAVILY_API_KEY'] = 'YOUR API KEY'
web_search_tool = TavilySearchResults(k=3)
We simply use it by running a query:
result = web_search_tool.invoke({"query": "What is Langchain?"})
result
And we get a list of web results with their URLs:
[{'url': 'https://www.simplilearn.com/tutorials/generative-ai-tutorial/what-is-langchain',
'content': 'LangChain is a tool that helps developers create applications using advanced language models. LangChain simplifies the creation of AI applications by making integrating data and adjusting model interactions easier. LangChain provides strong support for developers with tools to connect language models to various data sources. Now let’s explore how to develop applications with LangChain, highlighting the key steps to use language models effectively: In conclusion, LangChain is a powerful and adaptable tool for developing applications that use language models. LangChain is a framework that supports the development of applications using large language models (LLMs). While LLMs are the underlying models that perform tasks, LangChain offers the necessary tools and components to build, integrate, and manage applications that utilize these models, streamlining the development process.'},
{'url': 'https://python.langchain.com/v0.2/docs/introduction/',
'content': "LangChain is a Python library that simplifies the development, productionization, and deployment of applications powered by large language models (LLMs). Learn how to use LangChain's building blocks, components, and integrations to build chatbots, agents, and more."},
{'url': 'https://en.wikipedia.org/wiki/LangChain',
'content': 'In October 2023 LangChain introduced LangServe, a deployment tool designed to facilitate the transition from LCEL (LangChain Expression Language) prototypes to production-ready applications.[5]\nIntegrations[edit]\nAs of March 2023, LangChain included integrations with systems including Amazon, Google, and Microsoft Azure cloud storage; API wrappers for news, movie information, and weather; Bash for summarization, syntax and semantics checking, and execution of shell scripts; multiple web scraping subsystems and templates; few-shot learning prompt generation support; finding and summarizing "todo" tasks in code; Google Drive documents, spreadsheets, and presentations summarization, extraction, and creation; Google Search and Microsoft Bing web search; OpenAI, Anthropic, and Hugging Face language models; iFixit repair guides and wikis search and summarization; MapReduce for question answering, combining documents, and question generation; N-gram overlap scoring; PyPDF, pdfminer, fitz, and pymupdf for PDF file text extraction and manipulation; Python and JavaScript code generation, analysis, and debugging; Milvus vector database[6] to store and retrieve vector embeddings; Weaviate vector database[7] to cache embedding and data objects; Redis cache database storage; Python RequestsWrapper and other methods for API requests; SQL and NoSQL databases including JSON support; Streamlit, including for logging; text mapping for k-nearest neighbors search; time zone conversion and calendar operations; tracing and recording stack symbols in threaded and asynchronous subprocess runs; and the Wolfram Alpha website and SDK.[8] As a language model integration framework, LangChain\'s use-cases largely overlap with those of language models in general, including document analysis and summarization, chatbots, and code analysis.[2]\nHistory[edit]\nLangChain was launched in October 2022 as an open source project by Harrison Chase, while working at machine learning startup Robust Intelligence. In April 2023, LangChain had incorporated and the new startup raised over $20 million in funding at a valuation of at least $200 million from venture firm Sequoia Capital, a week after announcing a $10 million seed investment from Benchmark.[3][4]\n The project quickly garnered popularity, with improvements from hundreds of contributors on GitHub, trending discussions on Twitter, lively activity on the project\'s Discord server, many YouTube tutorials, and meetups in San Francisco and London. As of April 2023, it can read from more than 50 document types and data sources.[9]\nReferences[edit]\nExternal links[edit]'},
{'url': 'https://www.langchain.com/langchain',
'content': 'Augment the power\nof\xa0LLMs with your data\nLangChain helps connect LLMs to your company’s private sources\nof data and APIs to create context-aware, reasoning applications.\n Our Methods\nReady to start shipping\nreliable GenAI apps faster?\nLangChain and LangSmith are critical parts of the reference\narchitecture to get you from prototype to production. The largest community building the future of LLM apps\nLangChain’s flexible abstractions and AI-first toolkit make it\xa0the\xa0#1\xa0choice for developers when building with GenAI.\n Why choose LangChain?\nLangChain is easy to get started with and\xa0gives\xa0you choice, flexibility, and power as\xa0you scale.\n Get customizability and control with a durable runtime baked in\nLangChain Expression Language (LCEL) lets you build your app in a truly composable way, allowing you to customize it as you see fit.'},
{'url': 'https://www.ibm.com/topics/langchain',
'content': 'Applications made with LangChain provide great utility for a variety of use cases, from straightforward question-answering and text generation tasks to more complex solutions that use an LLM as a “reasoning engine.”\nTrain, validate, tune and deploy generative AI, foundation models and machine learning capabilities with ease and build AI applications in a fraction of the time with a fraction of the data.\n When building a chain for an agent, inputs include:\nDespite their heralded power and versatility, LLMs have important limitations: namely, a lack of up-to-date information, a lack of domain-specific expertise and a general difficulty with math.\nLangChain tools\xa0(link resides outside ibm.com) are a set of functions that empower LangChain agents to interact with real-world information in order to expand or improve the services it can provide. Launched by Harrison Chase in October 2022, LangChain enjoyed a meteoric rise to prominence: as of June 2023, it was the single fastest-growing open source project on Github.1 Coinciding with the momentous launch of OpenAI’s ChatGPT the following month, LangChain has played a significant role in making generative AI more accessible to enthusiasts in the wake of its widespread popularity.\n Reimagine how you work with\xa0AI: our diverse, global team of more than 20,000 AI\xa0experts can help you quickly and confidently\xa0design and scale AI and automation across your\xa0business, working across our own IBM\xa0watsonx\xa0technology\xa0and an open ecosystem of partners to deliver any\xa0AI model, on any cloud, guided by ethics and trust.\n While it’s the GPT model that interprets the user’s input and composes a natural language response, it’s the application that (among other things) provides an interface for the user to type and read and a UX design that governs the chatbot experience.'}]
First, we are going implement a chain to route the question to the right search mode. We have the prompt:
system_prompt = """
You are an expert at routing a user question to a vectorstore, a websearch or a simple QA language model.
The vectorstore contains documents related to Langchain.
If you can answer the question without any additional context or if a websearch could not provide additional context, route it to the QA language model.
If you need additional context and it is a question about Langchain, use the vectorstore, otherwise, use websearch.
"""
router_prompt = ChatPromptTemplate.from_messages(
[
("system", system_prompt),
("human", "{question}"),
]
)
From the prompt, we build the chain:
class RouteQuery(BaseModel):
route: Literal["vectorstore", "websearch", "QA_LM"] = Field(
description="Given a user question choose to route it to web search (websearch), a vectorstore (vectorstore), or a QA language model (QA_LM).",
)
question_router = (
router_prompt
| llm_engine.with_structured_output(RouteQuery)
)
Let’s have a chain now to rewrite the query as a query for a websearch. This is very similar to what we have already done for the database query. Here is the prompt:
system_prompt = """
You are a question re-writer that converts an input question to a better version that is optimized for web search.
Look at the input and try to reason about the underlying semantic intent / meaning.
Additional feedback may be provided for why a previous version of the question didn't lead to a valid response. Make sure to utilize that feedback to generate a better question.
Only respond with the rewritten question and nothing else!
"""
human_prompt = """
Here is the initial question: {question}
Here is the feedback about previous versions of the question:
{feedback}
Formulate an improved question.
Rewritten question:
"""
websearch_query_rewrite_prompt = ChatPromptTemplate.from_messages(
[
("system", system_prompt),
("human", human_prompt),
]
)
With its chain:
websearch_query_rewriter = (
websearch_query_rewrite_prompt
| llm_engine
| StrOutputParser()
)
And now, we need a simple chain to answer basic questions:
system_prompt = """
You are a helpful assistant. Provide a answer to the user.
"""
simple_question_prompt = ChatPromptTemplate.from_messages(
[
("system", system_prompt),
("human", "{question}"),
]
)
And its chain:
simple_question_chain = (
simple_question_prompt
| llm_engine
| StrOutputParser()
)
To make sure the graph remembers what search mode was selected, I am going to create a new attribute in the graph state:
class GraphState(BaseModel):
...
search_mode: Literal["vectorstore", "websearch", "QA_LM"] = "QA_LM"
Let’s implement the related nodes. First, the router node:
def router_node(state: GraphState):
route_query = question_router.invoke(state.question)
return route_query.route
Then, the simple question node:
def simple_question_node(state: GraphState):
answer = simple_question_chain.invoke(state.question)
return {"generation": answer, "search_mode": "QA_LM"}
The web search query rewrite node:
def websearch_query_rewriting_node(state: GraphState):
rewritten_question = websearch_query_rewriter.invoke({
"question": state.question,
"feedback": "\n".join(state.query_feedbacks)
})
return {
"rewritten_question": rewritten_question,
"search_mode": "websearch"
}
And the web search node:
def web_search_node(state: GraphState):
new_docs = web_search_tool.invoke(
{"query": state.rewritten_question}
)
web_results = [d["content"] for d in new_docs]
state.documents.extend(web_results)
return {"documents": state.documents}
In the database query rewrite node, I make sure to capture the search mode as well:
def db_query_rewriting_node(state: GraphState):
...
return {
"rewritten_question": rewritten_question,
"search_mode": "vectorstore"
}
Only remains to route correctly after the query feedback by making sure we are still retrying the same search mode:
def search_mode_node(state: GraphState):
return state.search_mode
Let’s add the new nodes to the pipeline:
pipeline.add_node('simple_question_node', simple_question_node)
pipeline.add_node(
'websearch_query_rewriting_node',
websearch_query_rewriting_node
)
pipeline.add_node('web_search_node', web_search_node)
Let’s make sure we reconnect the graph correctly. The entry point is the question router:
pipeline.add_conditional_edges(
START,
router_node,
{
"vectorstore": 'db_query_rewrite_node',
"websearch": 'websearch_query_rewriting_node',
"QA_LM": 'simple_question_node'
},
)
After rewriting the query, we perform the web search and send the resulting documents to the filter documents node:
pipeline.add_edge('websearch_query_rewriting_node', 'web_search_node')
pipeline.add_edge('web_search_node', 'filter_docs_node')
The simple question node is an end state of the pipeline:
pipeline.add_edge('simple_question_node', END)
Now, from the query feedback node, we move the search mode router:
pipeline.add_conditional_edges(
'query_feedback_node',
search_mode_node,
{
"vectorstore": 'db_query_rewrite_node',
"websearch": 'websearch_query_rewriting_node',
}
)
Let’s draw the resulting graph:
display(Image(rag_pipeline.get_graph().draw_png()))
Corrective RAG
We are going to capture in the graph the number of retrievals:
class GraphState(BaseModel):
...
retrieval_num: int = 0
So now, we are going to capture the number of times we retrieved:
def retriever_node(state: GraphState):
...
return {
"documents": state.documents,
"retrieval_num": state.retrieval_num + 1
}
Same thing in the web search node:
def web_search_node(state: GraphState):
...
return {
"documents": state.documents,
"retrieval_num": state.retrieval_num + 1
}
If we reach the web search query rewrite node from another search mode, we are going to reset the number of retrievals to 0:
def websearch_query_rewriting_node(state: GraphState):
...
if state.search_mode != "websearch":
state.retrieval_num = 0
return {
"rewritten_question": rewritten_question,
"search_mode": "websearch",
"retrieval_num": state.retrieval_num
}
Now, we just implement the routing logic based on the different values of the state:
MAX_RETRIEVALS = 3
def relevant_documents_validation_node(state: GraphState):
if state.documents:
# we have relevant documents
return "knowledge_extraction"
elif (state.search_mode == 'vectorsearch'
and state.retrieval_num > MAX_RETRIEVALS):
# we don't have relevant documents
# and we reached the maximum number of retrievals
return "max_db_search"
elif (state.search_mode == 'websearch'
and state.retrieval_num > MAX_RETRIEVALS):
# we don't have relevant documents
# and we reached the maximum number of websearches
return "max_websearch"
else:
# we don't have relevant documents
# so we retry the search
return state.search_mode
The logic is simple:
-
If we have relevant documents selected by the previous filter document node, we can advance to the knowledge extraction node.
-
If we reach the maximum number of retrievals and we are in a database search mode, we move to the web search branch.
-
If we reach the maximum number of retrievals and we are in the web search mode, we give up and apologize to the user.
-
If we don’t reach the maximum number of retries, we retry the current search mode.
Now, we just need to route the right node:
pipeline.add_conditional_edges(
'filter_docs_node',
relevant_documents_validation_node,
{
"knowledge_extraction": 'extract_knowledge_node',
"websearch": 'websearch_query_rewriting_node',
"vectorstore": 'db_query_rewrite_node',
"max_db_search": 'websearch_query_rewriting_node',
"max_websearch": 'give_up_node'
}
)
Let’s now draw the resulting graph:
display(Image(rag_pipeline.get_graph().draw_png()))
Let’s stop here!
Let’s see an example of the path taken to answer a question:
question = """
Can you show me a complete example of how to mix runnablelambda with runnablepassthrough and runnableparallel in langchain?
"""
for output in rag_pipeline.stream({"question": question}):
for key, value in output.items():
print(key)
db_query_rewrite_node
retrieval_node
filter_docs_node
db_query_rewrite_node
retrieval_node
filter_docs_node
extract_knowledge_node
generator_node
With the answer:
value["generation"]
```python
from langchain_core.runnables import (
RunnableLambda,
RunnableParallel,
RunnablePassthrough
)
# Create a RunnableLambda that adds 7 to the input
runnable1 = RunnableLambda(lambda x: x["foo"] + 7)
# Create a RunnablePassthrough to pass through additional data
chain = RunnablePassthrough.assign(bar=runnable1)
# Create a RunnableParallel to run two runnables in parallel
runnable2 = RunnableLambda(lambda x: [x] * 2)
parallel_chain = RunnableParallel(first=runnable1, second=runnable2)
result = final_chain.invoke({"foo": 10})
print(result)
```
Not bad! You can find the code used in this tutorial in this repo.