AutoGen Agent Training using AgentOptimizer
parmarjatin4911@gmail.com
Posted on January 28, 2024
AutoGen Agent Training using AgentOptimizer
import json
import os
from typing import Any, Callable, Dict, List, Optional, Union, Tuple
import autogen
from autogen.agentchat import Agent
from autogen.agentchat.contrib.math_user_proxy_agent import MathUserProxyAgent
from autogen.code_utils import extract_code, execute_code
from autogen.math_utils import get_answer
from openai import OpenAI
from openai import (
BadRequestError,
)
1. AgentOptimizer
class AgentOptimizer():
OPT_PROMPT = """You are a function optimizer. Your task is to maintain a list of functions for the assistant according to the existing function list and conversation history that happens between the assistant and the user.
You can perform one of the following four actions to manipulate the function list using the functions you have:
- Revise one existing function (using revise_function).
- Remove one existing function (using remove_function).
- Add one new function (using add_function).
- Directly return "TERMINATE" to me if no more actions are needed for the current function list.
Below are the principles that you need to follow for taking these four actions.
(1) Revise one existing function:
- Pay more attention to the failed tasks and corresponding error information, and optimize the function used in these tasks according to the conversation history if needed.
- A failed function call can occur due to incorrect input arguments (missing arguments) or an incorrect function code implementation. You should focus more on the function code implementation and make it easy to get success function call.
- Do not revise the function that you think works well and plays a critical role in solving the problems according to the conversation history. Only making revisions if needed.
- Sometimes, a NameError may occur. To fix this error, you can either revise the name of the function in the code implementation or revise the name of the function call to make these two names consistent. (2) Remove one existing function:
- Only remove the function that you think is not needed anymore in future tasks. (3) Add one new function:
- The added new function should solve a higher-level question that encompasses the original query and extend the code's functionality to make it more versatile and widely applicable.
- The added new function should solve queries of the same type, based on common reasoning steps without mentioning specific object names or entity terms.
- Name the function and write the description concerning both the core reasoning pattern and data organization format, without referencing specific objects. The name of the function MUST be the same with the function name in the code you generated.
- Replace specific strings or variable names with general variables to enhance the tool's applicability to various queries. All names used inside the function should be passed in as arguments. (4) Directly return "TERMINATE": If you think there is no need to perform any other actions for the current function list since the current list is optimal more actions will harm the performance in future tasks. Please directly reply to me with "TERMINATE".
One function signature includes the following five elements:
- Function name
- Function description
- JSON schema of arguments encoded as a string
- A list of package names imported by the function packages
- The code implementation
Below are the signatures of the current functions:
List A: {signiture}.
The success rate (performance) with this function list is {success_rate}.
The following list are the function signatures that you have after taking {actions_num} actions in our previous conversations:
List B: {after_signiture}.
Here are {conversation_num} conversation histories of solving {conversation_num} tasks.
History:
{history}
The following table shows the statistical information for solving each task in each conversation and indicates whether each task was successfully solved.
1 represents correct. 0 represents wrong.
statistic:
{statistic}
According to the information I provide, please take one of four actions to manipulate list B using the functions you know.
"""
ADD_FUNC = {
"type": "function",
"function": {
"name": "add_function",
"description": "Add a function in the context of the conversation. Necessary Python packages must be declared. The name of the function MUST be the same with the function name in the code you generated.",
"parameters": {
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "The name of the function in the code implementation."
},
"description": {
"type": "string",
"description": "A short description of the function."
},
"arguments": {
"type": "string",
"description": "JSON schema of arguments encoded as a string. Please note that the JSON schema only supports specific types including string, integer, object, array, boolean. (do not have float type) For example: { \"url\": { \"type\": \"string\", \"description\": \"The URL\", }}. Please avoid the error 'array schema missing items' when using array type."
},
"packages": {
"type": "string",
"description": "A list of package names imported by the function, and that need to be installed with pip prior to invoking the function. This solves ModuleNotFoundError. It should be string, not list."
},
"code": {
"type": "string",
"description": "The implementation in Python. Do not include the function declaration."
}
},
"required": ["name", "description", "arguments", "packages", "code"]
}
}
}
REVISE_FUNC = {
"type": "function",
"function": {
"name": "revise_function",
"description": "Revise a function in the context of the conversation. Necessary Python packages must be declared. The name of the function MUST be the same with the function name in the code you generated.",
"parameters": {
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "The name of the function in the code implementation."
},
"description": {
"type": "string",
"description": "A short description of the function."
},
"arguments": {
"type": "string",
"description": "JSON schema of arguments encoded as a string. Please note that the JSON schema only supports specific types including string, integer, object, array, boolean. (do not have float type) For example: { \"url\": { \"type\": \"string\", \"description\": \"The URL\", }}. Please avoid the error 'array schema missing items' when using array type."
},
"packages": {
"type": "string",
"description": "A list of package names imported by the function, and that need to be installed with pip prior to invoking the function. This solves ModuleNotFoundError. It should be string, not list."
},
"code": {
"type": "string",
"description": "The implementation in Python. Do not include the function declaration."
}
},
"required": ["name", "description", "arguments", "packages", "code"]
}
}
}
REMOVE_FUNC = {
"type": "function",
"function": {
"name": "remove_function",
"description": "Remove one function in the context of the conversation. Once remove one function, the assistant will not use this function in future conversation.",
"parameters": {
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "The name of the function in the code implementation."
}
},
"required": ["name"]
}
}
}
def __init__(self, OAI_config, action_num=3, each_action_max_trials=10):
self._action_num = action_num
self._each_action_max_trials = each_action_max_trials
self._client = OpenAI()
self.model = "gpt-4-1106-preview"
def _val_json(self, actions):
if actions is None:
return True
else:
for action in actions:
function_args = action.function.arguments
try:
function_args = json.loads(function_args.strip('"'))
if 'arguments' in function_args.keys():
json.loads(function_args.get("arguments").strip('"'))
except Exception as e:
print("JSON is invalid:", e)
return False
return True
def _val_remove(self, actions, after_signiture):
if actions is None:
return True
else:
for action in actions:
action_name = action.function.name
if action_name == "remove_function":
function_args = json.loads(action.function.arguments.strip('"'))
if function_args.get("name") not in [item["name"] for item in after_signiture]:
print("The function you want to remove does not exist.")
return False
return True
def _val_syntax(self, actions):
if actions is None:
return True
else:
for action in actions:
if action.function.name != "remove_function":
function_args = json.loads(action.function.arguments.strip('"'))
code = function_args.get("code")
try:
compile(code, '<string>', 'exec')
print("successfully compiled")
except SyntaxError as e:
print("Syntax is invalid:", e)
return False
return True
def _format_actions(self, actions):
ans = []
for action in actions:
func = json.loads(action.function.arguments.strip('"'))
func["action_name"] = action.function.name
if func.get("action_name") == "remove_function":
item = {
"action_name": func.get("action_name"),
"name": func.get("name"),
}
else:
item = {
"action_name": func.get("action_name"),
"name": func.get("name"),
"description": func.get("description"),
"arguments": json.loads(func.get("arguments").strip('"')),
"packages": func.get("packages"),
"code": func.get("code"),
}
ans.append(item)
return ans
def _get_success_rate(self, statistic):
sum = 0
for key, value in statistic.items():
if "is_correct" not in value.keys():
statistic[key]["is_correct"] = 0
for key, value in statistic.items():
sum += value["is_correct"]
if len(statistic.keys()) != 0:
success_rate = sum / len(statistic.keys())
else:
success_rate = None
return success_rate, statistic
def _modify_function_signiture(self, cur_functions, action_json):
for action in action_json:
action_name = action.get("action_name")
if action_name != "remove_function":
cur_functions = [
item for item in cur_functions if item["name"] != action.get("name")]
cur_functions.append(
{"name": action.get("name"),
"description": action.get("description"),
"arguments": action.get("arguments"),
"packages": action.get("packages"),
"code": action.get("code")}
)
else:
cur_functions = [
item for item in cur_functions if item["name"] != action.get("name")]
return cur_functions
def update_function_call(self, action, mathproxyagent, assistant):
def execute_func(name, packages, code, **args):
pip_install = (
f"""print("Installing package: {packages}")\nsubprocess.run(["pip", "-qq", "install", "{packages}"])"""
if packages
else ""
)
str = f"""
import subprocess
{pip_install}
print("Result of {name} function execution:")
{code}
args={args}
result={name}(**args)
if result is not None: print(result)
"""
print(f"execute_code:\n{str}")
result = execute_code(str)
if result[0] != 0:
raise Exception("Error in executing function:" + result[1])
print(f"Result: {result[1]}")
return result[1]
name, description, arguments, packages, code, action_name = action.get("name"), action.get(
"description"), action.get("arguments"), action.get("packages"), action.get("code"), action.get("action_name")
if name in mathproxyagent._function_map.keys():
del mathproxyagent._function_map[name]
if action_name != "remove_function":
function_config = {
"name": name,
"description": description,
"parameters": {"type": "object", "properties": arguments},
}
mathproxyagent.register_function(
function_map={name: lambda **args: execute_func(name, packages, code, **args)})
assistant.update_function_signature(function_config, is_remove=False)
else:
assistant.update_function_signature(name, is_remove=True)
def step(self, history, statistic, func_signiture):
action_return = []
origin_signiture = func_signiture
modified_signiture = origin_signiture
success_rate, statistic = self._get_success_rate(statistic) # TODO: make statistic feasible outside of the loop
for action_index in range(self._action_num):
prompt = self.OPT_PROMPT.format(
conversation_num=len(history),
statistic={"is_correct": statistic},
signiture=origin_signiture,
history=history,
success_rate=success_rate,
actions_num=action_index,
after_signiture=modified_signiture,
)
messages = [{"role": "user", "content": prompt}]
for _ in range(self._each_action_max_trials):
response = self._client.chat.completions.create(
model=self.model,
messages=messages,
tools=[self.ADD_FUNC, self.REVISE_FUNC, self.REMOVE_FUNC],
tool_choice="auto",
)
actions = response.choices[0].message.tool_calls
if self._val_json(actions) and self._val_syntax(actions) and self._val_remove(actions, modified_signiture):
break
if actions is not None:
action_result = self._format_actions(actions)
action_return = action_return + action_result
modified_signiture = self._modify_function_signiture(modified_signiture, action_result)
return action_return, modified_signiture
2. Math User Proxy Agent
def is_termination_msg_mathchat(message):
if isinstance(message, dict):
message = message.get("content")
if message is None:
return False
cb = extract_code(message)
contain_code = False
for c in cb:
if c[0] == "python" or c[0] == "wolfram":
contain_code = True
break
if message.rstrip().find("TERMINATE") >= 0:
return True
return (
not contain_code
and get_answer(message) is not None
and get_answer(message) != ""
)
class MathUserProxyAgent(MathUserProxyAgent):
MAX_CONSECUTIVE_AUTO_REPLY = 15
DEFAULT_REPLY = "Continue. Please keep solving the problem until you need to query. (If you get to the answer, put it in \boxed{}.)"
PROMPTS = """Let's solve a math problem.
Query requirements:
You should always use the 'print' function for the output and use fractions/radical forms instead of decimals.
You can use packages like sympy to help you.
You must follow the formats below to write your code:
# your code
If some packages are missing, you could also suggest a code to install the corresponding package.
Please follow this process:
- Solve the problem step by step (do not over-divide the steps).
- Take out any queries that can be asked through Python code (for example, any calculations or equations that can be calculated) and functions you know in the context of this conversation.
Please
(1) do not mix suggested Python codes and function calls in one step.
(2) You MUST remember that you don’t have a function named "python" available.
You must follow the formats below to write your Python code:
# your code
- Wait for me to give the results or wait for the executed results of the function call.
- Continue if you think the result is correct. If the result is invalid or unexpected, please correct your query or reasoning.
After all the queries are run and you get the answer, put the answer in \boxed{}.
Problem:
"""
def init(
self,
name: Optional[str] = "MathChatAgent",
is_termination_msg: Optional[
Callable[[Dict], bool]
] = is_termination_msg_mathchat,
human_input_mode: Optional[str] = "NEVER",
default_auto_reply: Optional[Union[str, Dict, None]] = DEFAULT_REPLY,
max_invalid_q_per_step=3,
**kwargs,
):
super().init(
name=name,
is_termination_msg=is_termination_msg,
human_input_mode=human_input_mode,
default_auto_reply=default_auto_reply,
max_invalid_q_per_step=max_invalid_q_per_step,
**kwargs,
)
del self._reply_func_list[2]
self.register_reply([Agent, None], MathUserProxyAgent._generate_math_reply, position=4)
del self._reply_func_list[3]
self.register_reply(trigger=autogen.ConversableAgent,
reply_func=MathUserProxyAgent.generate_function_call_reply, position=3)
self.register_reply(trigger=autogen.ConversableAgent,
reply_func=MathUserProxyAgent._check_final_result, position=0)
self.max_function_call_trial = 3
self.query = None
self.answer = None
def generate_function_call_reply(
self,
messages: Optional[List[Dict]] = None,
sender: Optional[autogen.ConversableAgent] = None,
config: Optional[Any] = None,
) -> Tuple[bool, Union[Dict, None]]:
"""Generate a reply using function call."""
if messages is None:
messages = self._oai_messages[sender]
message = messages[-1]
if "function_call" in message:
is_exec_success, func_return = self.execute_function(
message["function_call"])
if is_exec_success:
self.max_function_call_trial = 3
return True, func_return
else:
if self.max_function_call_trial == 0:
error_message = func_return["content"]
self.logs["is_correct"] = 0
self.max_function_call_trial = 3
return True, "The func is executed failed many times. " + error_message + ". Please directly reply me with TERMINATE. We need to terminate the conversation."
else:
revise_prompt = "You may make a wrong function call (It may due the arguments you provided doesn't fit the function arguments like missing required positional argument). \
If you think this error occurs due to you make a wrong function arguments input and you could make it success, please try to call this function again using the correct arguments. \
Otherwise, the error may be caused by the function itself. Please directly reply me with TERMINATE. We need to terminate the conversation. "
error_message = func_return["content"]
return True, "The func is executed failed." + error_message + revise_prompt
return False, None
def initiate_chat(
self,
recipient,
query: None,
answer: None,
silent: Optional[bool] = False,
**context,
):
self.query = query
if not isinstance(answer, str):
answer = str(answer)
if answer.endswith('.0'):
answer = answer[:-2]
self._answer = answer
else:
self._answer = answer
self.logs = {}
self._prepare_chat(recipient, True)
chat_history = []
error_message = None
try:
prompt = self.PROMPTS + context["problem"]
self.send(prompt, recipient, silent=silent)
except BadRequestError as e:
error_message = str(e)
self.logs["is_correct"] = 0
print("error information: {}".format(error_message))
key = list(self.chat_messages.keys())[0]
chat_messages = self.chat_messages[key]
for item in chat_messages:
chat_history.append(item)
if error_message is not None:
chat_history.append(error_message)
recipient.reset()
self.reset()
return self.logs, chat_history
def _check_final_result(
self,
messages: Optional[List[Dict]] = None,
sender: Optional[autogen.Agent] = None,
config: Optional[Any] = None):
messages = messages[-1]
if isinstance(messages, dict):
messages = messages.get("content")
if messages is None:
return False, None
cb = extract_code(messages)
contain_code = False
for c in cb:
if c[0] == "python" or c[0] == "wolfram":
contain_code = True
break
if (
not contain_code
and get_answer(messages) is not None
and get_answer(messages) != ""
):
if get_answer(messages) == self._answer:
self.logs["is_correct"] = 1
return True, "The result is Correct. Please reply me with TERMINATE."
else:
self.logs["is_correct"] = 0
return False, None
else:
return False, None
def _reset(self):
self._valid_q_count = 0
self._total_q_count = 0
self._accum_invalid_q_per_step = 0
self._previous_code = ""
self.last_reply = None
self.query = None
self.answer = None
self.logs = {}
- Load Dataset
test_data, train_data = [], []
def load_and_format_data(directory):
formatted_data = []
for filename in os.listdir(directory):
if filename.endswith('.json'):
with open(os.path.join(directory, filename), 'r', encoding='utf-8') as file:
data = json.load(file)
if isinstance(data, dict): # Checking if the data is a dictionary
# Renaming 'solution' to 'answer' and 'problem' to 'question'
formatted_entry = {
'answer': data.get('solution', ''),
'question': data.get('problem', '')
}
formatted_data.append(formatted_entry)
return formatted_data
Load and format test and train data
test_dir = "MATH/test/algebra/"
train_dir = "MATH/train/algebra/"
test_data = load_and_format_data(test_dir)
train_data = load_and_format_data(train_dir)
test_data, train_data = test_data[0:10], train_data[0:10]
4. Agents Creation
config_list = autogen.config_list_from_json(
"OAI_CONFIG_LIST",
)
mathproxyagent = MathUserProxyAgent(
name="mathproxyagent",
human_input_mode="NEVER",
code_execution_config={
"work_dir": "_output", "use_docker": False},
is_termination_msg=is_termination_msg_mathchat,
)
assistant = autogen.AssistantAgent(
name="assistant",
system_message="You are a helpful assistant.",
llm_config={
"timeout": 600,
"seed": 42,
"config_list": config_list,
}
)
5. Test without Agents Optimizer
history_list, statistic_list = {}, {}
for index, query in enumerate(test_data):
print(query)
single_statistic, chat_history = mathproxyagent.initiate_chat(
recipient=assistant, answer=query["answer"], query=["question"], problem=query["question"])
history_list["conversation: {index}".format(index=index + 1)] = chat_history
statistic_list["conversation: {index}".format(index=index + 1)] = single_statistic
sum = 0
for key, value in statistic_list.items():
if "is_correct" not in value.keys():
statistic_list[key]["is_correct"] = 0
for key, value in statistic_list.items():
sum += value["is_correct"]
success_rate_without_agent_training = sum / len(statistic_list.keys())
print("success_rate_without_agent_training: ", success_rate_without_agent_training)
6. Agents Training with Agents Optimizer
EPOCH = 10
agent_optimizer = AgentOptimizer(OAI_config=config_list)
history_list, statistic_list, function_json, agent_list = [], [], [], []
for epoch in range(EPOCH):
if len(history_list) != 0:
actions, function_json = agent_optimizer.step(history_list, statistic_list, function_json)
for action in actions:
agent_optimizer.update_function_call(action, mathproxyagent=mathproxyagent, assistant=assistant)
history_list, statistic_list = {}, {}
for index, query in enumerate(train_data):
single_statistic, chat_history = mathproxyagent.initiate_chat(
recipient=assistant, answer=query["answer"], query=["question"], problem=query["question"])
history_list["conversation: {index}".format(
index=index + 1)] = chat_history
statistic_list["conversation: {index}".format(
index=index + 1)] = single_statistic
sum = 0
for key, value in statistic_list.items():
if "is_correct" not in value.keys():
statistic_list[key]["is_correct"] = 0
for key, value in statistic_list.items():
sum += value["is_correct"]
print("Train_Epoch_{epoch_num}_Success_Rate: {average}%".format(epoch_num=epoch, average=sum/len(statistic_list)))
- Test with Agents Optimizer
history_list, statistic_list = {}, {}
for index, query in enumerate(test_data):
single_statistic, chat_history = mathproxyagent.initiate_chat(
recipient=assistant, answer=query["answer"], query=["question"], problem=query["question"])
history_list["conversation: {index}".format(index=index + 1)] = chat_history
statistic_list["conversation: {index}".format(index=index + 1)] = single_statistic
sum = 0
for key, value in statistic_list.items():
if "is_correct" not in value.keys():
statistic_list[key]["is_correct"] = 0
for key, value in statistic_list.items():
sum += value["is_correct"]
success_rate_with_agent_training = sum / len(statistic_list.keys())
print("------------------------------------------------Functions learned------------------------------------------------")
for function in function_json:
print(function)
print("------------------------------------------------Summary------------------------------------------------\n")
print("success_rate_without_agent_training: {average}%\n".format(average = success_rate_without_agent_training*100))
print("success_rate_with_agent_training: {average}%\n".format(average = success_rate_with_agent_training*100))
[dataset](Dataset
Posted on January 28, 2024
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.