Parallel Chains in LangChain
thilak15
Posted on October 16, 2024
In this guide, we'll delve into how LangChain facilitates parallel processing using a Meeting Summary Generator as a reference.
Why Parallel Chains?
Parallel chains allow multiple tasks to run concurrently, reducing overall execution time and improving resource utilization. This is especially beneficial when dealing with tasks that can operate independently, such as extracting different components from a dataset.
Key Components
RunnableLambda: Wraps Python functions to be used within LangChain chains.
RunnableParallel: Enables the parallel execution of multiple runnable branches.
StrOutputParser: Parses the string output from the language model.
Step-by-Step Implementation
- Initialize the language model using LangChain’s ChatOllama. This model will process the prompts and generate responses.
from langchain_ollama import ChatOllama
# Initialize the ChatOllama model
model = ChatOllama(model="llama3.2:1b-instruct-fp16")
- Create prompt templates to instruct the model on the specific tasks: extracting key points, decisions, and action items.
from langchain.prompts import ChatPromptTemplate
# Prompt to summarize key points from meeting notes
prompt_template = ChatPromptTemplate.from_messages(
[
("system", "You are an expert meeting assistant."),
("human", "Summarize the key points of the following meeting notes:\n\n{meeting_notes}"),
]
)
# Prompt to extract decisions
def analyze_decisions(key_points):
decisions_template = ChatPromptTemplate.from_messages(
[
("system", "You are an expert meeting assistant."),
("human", "Given these key points: {key_points}, list the decisions made during the meeting."),
]
)
return decisions_template.format_prompt(key_points=key_points)
# Prompt to extract action items
def analyze_action_items(key_points):
action_items_template = ChatPromptTemplate.from_messages(
[
("system", "You are an expert meeting assistant."),
("human", "Given these key points: {key_points}, list the action items assigned during the meeting, including the responsible person and the deadline if available."),
]
)
return action_items_template.format_prompt(key_points=key_points)
- Utilize RunnableLambda to wrap the analysis functions and RunnableParallel to execute them concurrently.
from langchain.schema.output_parser import StrOutputParser
from langchain.schema.runnable import RunnableParallel, RunnableLambda
# Function to combine decisions and action items
def combine_summary(decisions, action_items):
return f"**Decisions Made:**\n{decisions}\n\n**Action Items:**\n{action_items}"
# Runnable chains for decisions and action items
decisions_branch_chain = (
RunnableLambda(lambda x: analyze_decisions(x)) | model | StrOutputParser()
)
action_items_branch_chain = (
RunnableLambda(lambda x: analyze_action_items(x)) | model | StrOutputParser()
)
# Combined parallel chain
chain = (
prompt_template
| model
| StrOutputParser()
| RunnableParallel(branches={
"decisions": decisions_branch_chain,
"action_items": action_items_branch_chain
})
| RunnableLambda(lambda x: combine_summary(
x["branches"]["decisions"],
x["branches"]["action_items"]
))
)
Explanation:
- RunnableLambda wraps the analyze_decisions and analyze_action_items functions, allowing them to be part of the LangChain pipeline.
- RunnableParallel runs the decisions_branch_chain and action_items_branch_chain simultaneously.
- The final RunnableLambda combines the outputs from both branches into a structured summary.
# Example meeting notes
meeting_notes = """
**Project Kickoff Meeting - April 25, 2024**
- Discussed project timeline and milestones.
- Assigned tasks to team members.
- Reviewed budget allocations.
- Identified potential risks and mitigation strategies.
- Decided to use Agile methodology for project management.
- Scheduled weekly check-in meetings.
- Agreed on communication channels and tools.
**Action Items:**
1. John to set up the project repository by April 26.
2. Sarah to draft the initial project plan by April 28.
3. Mike to research risk mitigation strategies by April 30.
"""
# Run the chain
result = chain.invoke({"meeting_notes": meeting_notes})
# Output the result
print(result)
Sample Output:
**Decisions Made:**
- Decided to use Agile methodology for project management.
**Action Items:**
1. John to set up the project repository by April 26.
2. Sarah to draft the initial project plan by April 28.
3. Mike to research risk mitigation strategies by April 30.
Benefits of Parallel Chains in LangChain
- Efficiency: Processes multiple tasks simultaneously, reducing total execution time.
- Modularity: Each task is encapsulated, making the workflow easy to manage and extend.
- Scalability: Additional analysis branches can be added without disrupting existing chains.
- Clarity: Organized outputs enhance readability and usability of the resu
Posted on October 16, 2024
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.