AutoGen Agent Training using AgentOptimizer

jhparmar

parmarjatin4911@gmail.com

Posted on January 28, 2024

AutoGen Agent Training using AgentOptimizer

AutoGen Agent Training using AgentOptimizer

import json
import os
from typing import Any, Callable, Dict, List, Optional, Union, Tuple
import autogen
from autogen.agentchat import Agent
from autogen.agentchat.contrib.math_user_proxy_agent import MathUserProxyAgent
from autogen.code_utils import extract_code, execute_code
from autogen.math_utils import get_answer
from openai import OpenAI
from openai import (
BadRequestError,
)

1. AgentOptimizer

class AgentOptimizer():
OPT_PROMPT = """You are a function optimizer. Your task is to maintain a list of functions for the assistant according to the existing function list and conversation history that happens between the assistant and the user.
You can perform one of the following four actions to manipulate the function list using the functions you have:

  1. Revise one existing function (using revise_function).
  2. Remove one existing function (using remove_function).
  3. Add one new function (using add_function).
  4. Directly return "TERMINATE" to me if no more actions are needed for the current function list.

Below are the principles that you need to follow for taking these four actions.
(1) Revise one existing function:

  1. Pay more attention to the failed tasks and corresponding error information, and optimize the function used in these tasks according to the conversation history if needed.
  2. A failed function call can occur due to incorrect input arguments (missing arguments) or an incorrect function code implementation. You should focus more on the function code implementation and make it easy to get success function call.
  3. Do not revise the function that you think works well and plays a critical role in solving the problems according to the conversation history. Only making revisions if needed.
  4. Sometimes, a NameError may occur. To fix this error, you can either revise the name of the function in the code implementation or revise the name of the function call to make these two names consistent. (2) Remove one existing function:
  5. Only remove the function that you think is not needed anymore in future tasks. (3) Add one new function:
  6. The added new function should solve a higher-level question that encompasses the original query and extend the code's functionality to make it more versatile and widely applicable.
  7. The added new function should solve queries of the same type, based on common reasoning steps without mentioning specific object names or entity terms.
  8. Name the function and write the description concerning both the core reasoning pattern and data organization format, without referencing specific objects. The name of the function MUST be the same with the function name in the code you generated.
  9. Replace specific strings or variable names with general variables to enhance the tool's applicability to various queries. All names used inside the function should be passed in as arguments. (4) Directly return "TERMINATE": If you think there is no need to perform any other actions for the current function list since the current list is optimal more actions will harm the performance in future tasks. Please directly reply to me with "TERMINATE".

One function signature includes the following five elements:

  1. Function name
  2. Function description
  3. JSON schema of arguments encoded as a string
  4. A list of package names imported by the function packages
  5. The code implementation

Below are the signatures of the current functions:
List A: {signiture}.
The success rate (performance) with this function list is {success_rate}.
The following list are the function signatures that you have after taking {actions_num} actions in our previous conversations:
List B: {after_signiture}.
Here are {conversation_num} conversation histories of solving {conversation_num} tasks.
History:
{history}
The following table shows the statistical information for solving each task in each conversation and indicates whether each task was successfully solved.
1 represents correct. 0 represents wrong.
statistic:
{statistic}

According to the information I provide, please take one of four actions to manipulate list B using the functions you know.
"""

ADD_FUNC = {
    "type": "function",
    "function": {
        "name": "add_function",
        "description": "Add a function in the context of the conversation. Necessary Python packages must be declared. The name of the function MUST be the same with the function name in the code you generated.",
        "parameters": {
            "type": "object",
            "properties": {
                "name": {
                    "type": "string",
                    "description": "The name of the function in the code implementation."
                },
                "description": {
                    "type": "string",
                    "description": "A short description of the function."
                },
                "arguments": {
                    "type": "string",
                    "description": "JSON schema of arguments encoded as a string. Please note that the JSON schema only supports specific types including string, integer, object, array, boolean. (do not have float type) For example: { \"url\": { \"type\": \"string\", \"description\": \"The URL\", }}. Please avoid the error 'array schema missing items' when using array type."
                },
                "packages": {
                    "type": "string",
                    "description": "A list of package names imported by the function, and that need to be installed with pip prior to invoking the function. This solves ModuleNotFoundError. It should be string, not list."
                },
                "code": {
                    "type": "string",
                    "description": "The implementation in Python. Do not include the function declaration."
                }
            },
            "required": ["name", "description", "arguments", "packages", "code"]
        }
    }
}

REVISE_FUNC = {
    "type": "function",
    "function": {
        "name": "revise_function",
        "description": "Revise a function in the context of the conversation. Necessary Python packages must be declared. The name of the function MUST be the same with the function name in the code you generated.",
        "parameters": {
            "type": "object",
            "properties": {
                "name": {
                    "type": "string",
                    "description": "The name of the function in the code implementation."
                },
                "description": {
                    "type": "string",
                    "description": "A short description of the function."
                },
                "arguments": {
                    "type": "string",
                    "description": "JSON schema of arguments encoded as a string. Please note that the JSON schema only supports specific types including string, integer, object, array, boolean. (do not have float type) For example: { \"url\": { \"type\": \"string\", \"description\": \"The URL\", }}. Please avoid the error 'array schema missing items' when using array type."
                },
                "packages": {
                    "type": "string",
                    "description": "A list of package names imported by the function, and that need to be installed with pip prior to invoking the function. This solves ModuleNotFoundError. It should be string, not list."
                },
                "code": {
                    "type": "string",
                    "description": "The implementation in Python. Do not include the function declaration."
                }
            },
            "required": ["name", "description", "arguments", "packages", "code"]
        }
    }
}

REMOVE_FUNC = {
    "type": "function",
    "function": {
        "name": "remove_function",
        "description": "Remove one function in the context of the conversation. Once remove one function, the assistant will not use this function in future conversation.",
        "parameters": {
            "type": "object",
            "properties": {
                "name": {
                    "type": "string",
                    "description": "The name of the function in the code implementation."
                }
            },
            "required": ["name"]
        }
    }
}

def __init__(self, OAI_config, action_num=3, each_action_max_trials=10):
    self._action_num = action_num
    self._each_action_max_trials = each_action_max_trials
    self._client = OpenAI()
    self.model = "gpt-4-1106-preview"

def _val_json(self, actions):
    if actions is None:
        return True
    else:
        for action in actions:
            function_args = action.function.arguments
            try:
                function_args = json.loads(function_args.strip('"'))
                if 'arguments' in function_args.keys():
                    json.loads(function_args.get("arguments").strip('"'))
            except Exception as e:
                print("JSON is invalid:", e)
                return False
    return True

def _val_remove(self, actions, after_signiture):
    if actions is None:
        return True
    else:
        for action in actions:
            action_name = action.function.name
            if action_name == "remove_function":
                function_args = json.loads(action.function.arguments.strip('"'))
                if function_args.get("name") not in [item["name"] for item in after_signiture]:
                    print("The function you want to remove does not exist.")
                    return False
        return True

def _val_syntax(self, actions):
    if actions is None:
        return True
    else:
        for action in actions:
            if action.function.name != "remove_function":
                function_args = json.loads(action.function.arguments.strip('"'))
                code = function_args.get("code")
                try:
                    compile(code, '<string>', 'exec')
                    print("successfully compiled")
                except SyntaxError as e:
                    print("Syntax is invalid:", e)
                    return False
        return True

def _format_actions(self, actions):
    ans = []
    for action in actions:
        func = json.loads(action.function.arguments.strip('"'))
        func["action_name"] = action.function.name

        if func.get("action_name") == "remove_function":
            item = {
                "action_name": func.get("action_name"),
                "name": func.get("name"),
            }
        else:
            item = {
                "action_name": func.get("action_name"),
                "name": func.get("name"),
                "description": func.get("description"),
                "arguments": json.loads(func.get("arguments").strip('"')),
                "packages": func.get("packages"),
                "code": func.get("code"),
            }
        ans.append(item)
    return ans

def _get_success_rate(self, statistic):
    sum = 0
    for key, value in statistic.items():
        if "is_correct" not in value.keys():
            statistic[key]["is_correct"] = 0
    for key, value in statistic.items():
        sum += value["is_correct"]
    if len(statistic.keys()) != 0:
        success_rate = sum / len(statistic.keys())
    else:
        success_rate = None
    return success_rate, statistic

def _modify_function_signiture(self, cur_functions, action_json):
    for action in action_json:
        action_name = action.get("action_name")
        if action_name != "remove_function":
            cur_functions = [
                item for item in cur_functions if item["name"] != action.get("name")]
            cur_functions.append(
                {"name": action.get("name"),
                 "description": action.get("description"),
                 "arguments": action.get("arguments"),
                 "packages": action.get("packages"),
                 "code": action.get("code")}
            )
        else:
            cur_functions = [
                item for item in cur_functions if item["name"] != action.get("name")]
    return cur_functions

def update_function_call(self, action, mathproxyagent, assistant):
    def execute_func(name, packages, code, **args):
        pip_install = (
            f"""print("Installing package: {packages}")\nsubprocess.run(["pip", "-qq", "install", "{packages}"])"""
            if packages
            else ""
        )
        str = f"""
Enter fullscreen mode Exit fullscreen mode

import subprocess
{pip_install}
print("Result of {name} function execution:")
{code}
args={args}
result={name}(**args)
if result is not None: print(result)
"""
print(f"execute_code:\n{str}")
result = execute_code(str)
if result[0] != 0:
raise Exception("Error in executing function:" + result[1])
print(f"Result: {result[1]}")
return result[1]

    name, description, arguments, packages, code, action_name = action.get("name"), action.get(
        "description"), action.get("arguments"), action.get("packages"), action.get("code"), action.get("action_name")

    if name in mathproxyagent._function_map.keys():
        del mathproxyagent._function_map[name]
    if action_name != "remove_function":
        function_config = {
            "name": name,
            "description": description,
            "parameters": {"type": "object", "properties": arguments},
        }
        mathproxyagent.register_function(
            function_map={name: lambda **args: execute_func(name, packages, code, **args)})
        assistant.update_function_signature(function_config, is_remove=False)
    else:
        assistant.update_function_signature(name, is_remove=True)

def step(self, history, statistic, func_signiture):
    action_return = []
    origin_signiture = func_signiture
    modified_signiture = origin_signiture

    success_rate, statistic = self._get_success_rate(statistic)  # TODO: make statistic feasible outside of the loop
    for action_index in range(self._action_num):
        prompt = self.OPT_PROMPT.format(
            conversation_num=len(history),
            statistic={"is_correct": statistic},
            signiture=origin_signiture,
            history=history,
            success_rate=success_rate,
            actions_num=action_index,
            after_signiture=modified_signiture,
        )
        messages = [{"role": "user", "content": prompt}]
        for _ in range(self._each_action_max_trials):
            response = self._client.chat.completions.create(
                model=self.model,
                messages=messages,
                tools=[self.ADD_FUNC, self.REVISE_FUNC, self.REMOVE_FUNC],
                tool_choice="auto",
            )
            actions = response.choices[0].message.tool_calls
            if self._val_json(actions) and self._val_syntax(actions) and self._val_remove(actions, modified_signiture):
                break
        if actions is not None:
            action_result = self._format_actions(actions)
            action_return = action_return + action_result
            modified_signiture = self._modify_function_signiture(modified_signiture, action_result)
    return action_return, modified_signiture
Enter fullscreen mode Exit fullscreen mode

2. Math User Proxy Agent

def is_termination_msg_mathchat(message):
if isinstance(message, dict):
message = message.get("content")
if message is None:
return False
cb = extract_code(message)
contain_code = False
for c in cb:
if c[0] == "python" or c[0] == "wolfram":
contain_code = True
break
if message.rstrip().find("TERMINATE") >= 0:
return True

return (
    not contain_code
    and get_answer(message) is not None
    and get_answer(message) != ""
)
Enter fullscreen mode Exit fullscreen mode

class MathUserProxyAgent(MathUserProxyAgent):
MAX_CONSECUTIVE_AUTO_REPLY = 15
DEFAULT_REPLY = "Continue. Please keep solving the problem until you need to query. (If you get to the answer, put it in \boxed{}.)"
PROMPTS = """Let's solve a math problem.
Query requirements:
You should always use the 'print' function for the output and use fractions/radical forms instead of decimals.
You can use packages like sympy to help you.
You must follow the formats below to write your code:

# your code
Enter fullscreen mode Exit fullscreen mode

If some packages are missing, you could also suggest a code to install the corresponding package.

Please follow this process:

  1. Solve the problem step by step (do not over-divide the steps).
  2. Take out any queries that can be asked through Python code (for example, any calculations or equations that can be calculated) and functions you know in the context of this conversation.

Please
(1) do not mix suggested Python codes and function calls in one step.
(2) You MUST remember that you don’t have a function named "python" available.

You must follow the formats below to write your Python code:

# your code
Enter fullscreen mode Exit fullscreen mode
  1. Wait for me to give the results or wait for the executed results of the function call.
  2. Continue if you think the result is correct. If the result is invalid or unexpected, please correct your query or reasoning.

After all the queries are run and you get the answer, put the answer in \boxed{}.

Problem:
"""

def init(
self,
name: Optional[str] = "MathChatAgent",
is_termination_msg: Optional[
Callable[[Dict], bool]
] = is_termination_msg_mathchat,
human_input_mode: Optional[str] = "NEVER",
default_auto_reply: Optional[Union[str, Dict, None]] = DEFAULT_REPLY,
max_invalid_q_per_step=3,
**kwargs,
):
super().init(
name=name,
is_termination_msg=is_termination_msg,
human_input_mode=human_input_mode,
default_auto_reply=default_auto_reply,
max_invalid_q_per_step=max_invalid_q_per_step,
**kwargs,
)
del self._reply_func_list[2]
self.register_reply([Agent, None], MathUserProxyAgent._generate_math_reply, position=4)
del self._reply_func_list[3]
self.register_reply(trigger=autogen.ConversableAgent,
reply_func=MathUserProxyAgent.generate_function_call_reply, position=3)
self.register_reply(trigger=autogen.ConversableAgent,
reply_func=MathUserProxyAgent._check_final_result, position=0)
self.max_function_call_trial = 3
self.query = None
self.answer = None
Enter fullscreen mode Exit fullscreen mode

def generate_function_call_reply(
self,
messages: Optional[List[Dict]] = None,
sender: Optional[autogen.ConversableAgent] = None,
config: Optional[Any] = None,
) -> Tuple[bool, Union[Dict, None]]:
"""Generate a reply using function call."""
if messages is None:
messages = self._oai_messages[sender]
message = messages[-1]
if "function_call" in message:
is_exec_success, func_return = self.execute_function(
message["function_call"])
if is_exec_success:
self.max_function_call_trial = 3
return True, func_return
else:
if self.max_function_call_trial == 0:
error_message = func_return["content"]
self.logs["is_correct"] = 0
self.max_function_call_trial = 3
return True, "The func is executed failed many times. " + error_message + ". Please directly reply me with TERMINATE. We need to terminate the conversation."
else:
revise_prompt = "You may make a wrong function call (It may due the arguments you provided doesn't fit the function arguments like missing required positional argument). \
If you think this error occurs due to you make a wrong function arguments input and you could make it success, please try to call this function again using the correct arguments. \
Otherwise, the error may be caused by the function itself. Please directly reply me with TERMINATE. We need to terminate the conversation. "
error_message = func_return["content"]
return True, "The func is executed failed." + error_message + revise_prompt
return False, None

def initiate_chat(
self,
recipient,
query: None,
answer: None,
silent: Optional[bool] = False,
**context,
):
self.query = query
if not isinstance(answer, str):
answer = str(answer)
if answer.endswith('.0'):
answer = answer[:-2]
self._answer = answer
else:
self._answer = answer
self.logs = {}
self._prepare_chat(recipient, True)

chat_history = []
error_message = None

try:
    prompt = self.PROMPTS + context["problem"]
    self.send(prompt, recipient, silent=silent)
except BadRequestError as e:
    error_message = str(e)
    self.logs["is_correct"] = 0
    print("error information: {}".format(error_message))

key = list(self.chat_messages.keys())[0]
chat_messages = self.chat_messages[key]
for item in chat_messages:
    chat_history.append(item)
if error_message is not None:
    chat_history.append(error_message)
recipient.reset()
self.reset()
return self.logs, chat_history
Enter fullscreen mode Exit fullscreen mode

def _check_final_result(
self,
messages: Optional[List[Dict]] = None,
sender: Optional[autogen.Agent] = None,
config: Optional[Any] = None):
messages = messages[-1]

if isinstance(messages, dict):
    messages = messages.get("content")
    if messages is None:
        return False, None

cb = extract_code(messages)
contain_code = False
for c in cb:
    if c[0] == "python" or c[0] == "wolfram":
        contain_code = True
        break
if (
        not contain_code
        and get_answer(messages) is not None
        and get_answer(messages) != ""
):
    if get_answer(messages) == self._answer:
        self.logs["is_correct"] = 1
        return True, "The result is Correct. Please reply me with TERMINATE."
    else:
        self.logs["is_correct"] = 0
        return False, None
else:
    return False, None
Enter fullscreen mode Exit fullscreen mode

def _reset(self):
self._valid_q_count = 0
self._total_q_count = 0
self._accum_invalid_q_per_step = 0
self._previous_code = ""
self.last_reply = None

self.query = None
self.answer = None
self.logs = {}
Enter fullscreen mode Exit fullscreen mode
Enter fullscreen mode Exit fullscreen mode



  1. Load Dataset

test_data, train_data = [], []
def load_and_format_data(directory):
formatted_data = []
for filename in os.listdir(directory):
if filename.endswith('.json'):
with open(os.path.join(directory, filename), 'r', encoding='utf-8') as file:
data = json.load(file)
if isinstance(data, dict): # Checking if the data is a dictionary
# Renaming 'solution' to 'answer' and 'problem' to 'question'
formatted_entry = {
'answer': data.get('solution', ''),
'question': data.get('problem', '')
}
formatted_data.append(formatted_entry)
return formatted_data

Load and format test and train data

test_dir = "MATH/test/algebra/"
train_dir = "MATH/train/algebra/"

test_data = load_and_format_data(test_dir)
train_data = load_and_format_data(train_dir)

test_data, train_data = test_data[0:10], train_data[0:10]

4. Agents Creation

config_list = autogen.config_list_from_json(
"OAI_CONFIG_LIST",
)
mathproxyagent = MathUserProxyAgent(
name="mathproxyagent",
human_input_mode="NEVER",
code_execution_config={
"work_dir": "_output", "use_docker": False},
is_termination_msg=is_termination_msg_mathchat,
)
assistant = autogen.AssistantAgent(
name="assistant",
system_message="You are a helpful assistant.",
llm_config={
"timeout": 600,
"seed": 42,
"config_list": config_list,
}
)

5. Test without Agents Optimizer

history_list, statistic_list = {}, {}
for index, query in enumerate(test_data):
print(query)
single_statistic, chat_history = mathproxyagent.initiate_chat(
recipient=assistant, answer=query["answer"], query=["question"], problem=query["question"])
history_list["conversation: {index}".format(index=index + 1)] = chat_history
statistic_list["conversation: {index}".format(index=index + 1)] = single_statistic

sum = 0
for key, value in statistic_list.items():
if "is_correct" not in value.keys():
statistic_list[key]["is_correct"] = 0
for key, value in statistic_list.items():
sum += value["is_correct"]

success_rate_without_agent_training = sum / len(statistic_list.keys())
print("success_rate_without_agent_training: ", success_rate_without_agent_training)

6. Agents Training with Agents Optimizer

EPOCH = 10
agent_optimizer = AgentOptimizer(OAI_config=config_list)

history_list, statistic_list, function_json, agent_list = [], [], [], []
for epoch in range(EPOCH):
if len(history_list) != 0:
actions, function_json = agent_optimizer.step(history_list, statistic_list, function_json)
for action in actions:
agent_optimizer.update_function_call(action, mathproxyagent=mathproxyagent, assistant=assistant)
history_list, statistic_list = {}, {}
for index, query in enumerate(train_data):
single_statistic, chat_history = mathproxyagent.initiate_chat(
recipient=assistant, answer=query["answer"], query=["question"], problem=query["question"])
history_list["conversation: {index}".format(
index=index + 1)] = chat_history
statistic_list["conversation: {index}".format(
index=index + 1)] = single_statistic

sum = 0
for key, value in statistic_list.items():
if "is_correct" not in value.keys():
statistic_list[key]["is_correct"] = 0
for key, value in statistic_list.items():
sum += value["is_correct"]
print("Train_Epoch_{epoch_num}_Success_Rate: {average}%".format(epoch_num=epoch, average=sum/len(statistic_list)))
Enter fullscreen mode Exit fullscreen mode



  1. Test with Agents Optimizer

history_list, statistic_list = {}, {}
for index, query in enumerate(test_data):
single_statistic, chat_history = mathproxyagent.initiate_chat(
recipient=assistant, answer=query["answer"], query=["question"], problem=query["question"])
history_list["conversation: {index}".format(index=index + 1)] = chat_history
statistic_list["conversation: {index}".format(index=index + 1)] = single_statistic

sum = 0
for key, value in statistic_list.items():
if "is_correct" not in value.keys():
statistic_list[key]["is_correct"] = 0
for key, value in statistic_list.items():
sum += value["is_correct"]

success_rate_with_agent_training = sum / len(statistic_list.keys())

print("------------------------------------------------Functions learned------------------------------------------------")
for function in function_json:
print(function)
print("------------------------------------------------Summary------------------------------------------------\n")
print("success_rate_without_agent_training: {average}%\n".format(average = success_rate_without_agent_training*100))
print("success_rate_with_agent_training: {average}%\n".format(average = success_rate_with_agent_training*100))

[dataset](Dataset

https://github.com/hendrycks/math)

💖 💪 🙅 🚩
jhparmar
parmarjatin4911@gmail.com

Posted on January 28, 2024

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related