-
Notifications
You must be signed in to change notification settings - Fork 52
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Agent][Feat] Ensure coroutine safety #282
base: develop
Are you sure you want to change the base?
Changes from 1 commit
5b4bcd3
66a92a2
ede72e0
c0c4e70
14dc576
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,7 +7,6 @@ | |
Final, | ||
Iterable, | ||
List, | ||
NoReturn, | ||
Optional, | ||
Sequence, | ||
Tuple, | ||
|
@@ -32,7 +31,6 @@ | |
from erniebot_agent.memory.messages import Message, SystemMessage | ||
from erniebot_agent.tools.base import BaseTool | ||
from erniebot_agent.tools.tool_manager import ToolManager | ||
from erniebot_agent.utils.exceptions import FileError | ||
|
||
_PLUGINS_WO_FILE_IO: Final[Tuple[str]] = ("eChart",) | ||
|
||
|
@@ -110,6 +108,7 @@ def __init__( | |
if plugins is not None: | ||
raise NotImplementedError("The use of plugins is not supported yet.") | ||
self._init_file_needs_url() | ||
self._is_running = False | ||
|
||
@final | ||
async def run(self, prompt: str, files: Optional[Sequence[File]] = None) -> AgentResponse: | ||
|
@@ -123,8 +122,9 @@ async def run(self, prompt: str, files: Optional[Sequence[File]] = None) -> Agen | |
Returns: | ||
Response from the agent. | ||
""" | ||
if files: | ||
await self._ensure_managed_files(files) | ||
if self._is_running: | ||
raise RuntimeError("The agent is already running.") | ||
self._is_running = True | ||
await self._callback_manager.on_run_start(agent=self, prompt=prompt) | ||
try: | ||
agent_resp = await self._run(prompt, files) | ||
|
@@ -133,6 +133,8 @@ async def run(self, prompt: str, files: Optional[Sequence[File]] = None) -> Agen | |
raise e | ||
else: | ||
await self._callback_manager.on_run_end(agent=self, response=agent_resp) | ||
finally: | ||
self._is_running = False | ||
return agent_resp | ||
|
||
@final | ||
|
@@ -251,10 +253,10 @@ async def _run_tool(self, tool: BaseTool, tool_args: str) -> ToolResponse: | |
# XXX: Sniffing is less efficient and probably unnecessary. | ||
# Can we make a protocol to statically recognize file inputs and outputs | ||
# or can we have the tools introspect about this? | ||
input_files = file_manager.sniff_and_extract_files_from_list(list(parsed_tool_args.values())) | ||
input_files = await file_manager.sniff_and_extract_files_from_obj(parsed_tool_args) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 此处存在bug: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 这块我在 #292 中已经解决了,合入之后你这块 update 一下就行了。 input_files 和 output_files 这块都可能需要调整一下。 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 嗯嗯 |
||
tool_ret = await tool(**parsed_tool_args) | ||
if isinstance(tool_ret, dict): | ||
output_files = file_manager.sniff_and_extract_files_from_list(list(tool_ret.values())) | ||
output_files = await file_manager.sniff_and_extract_files_from_obj(tool_ret.values()) | ||
else: | ||
output_files = [] | ||
tool_ret_json = json.dumps(tool_ret, ensure_ascii=False) | ||
|
@@ -279,16 +281,3 @@ def _parse_tool_args(self, tool_args: str) -> Dict[str, Any]: | |
if not isinstance(args_dict, dict): | ||
raise ValueError(f"`tool_args` cannot be interpreted as a dict. `tool_args`: {tool_args}") | ||
return args_dict | ||
|
||
async def _ensure_managed_files(self, files: Sequence[File]) -> None: | ||
def _raise_exception(file: File) -> NoReturn: | ||
raise FileError(f"{repr(file)} is not managed by the file manager of the agent.") | ||
|
||
file_manager = self.get_file_manager() | ||
for file in files: | ||
try: | ||
managed_file = file_manager.look_up_file_by_id(file.id) | ||
except FileError: | ||
_raise_exception(file) | ||
if file is not managed_file: | ||
_raise_exception(file) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -196,10 +196,10 @@ async def _step( | |
PluginStep( | ||
info=output_message.plugin_info, | ||
result=output_message.content, | ||
input_files=file_manager.sniff_and_extract_files_from_text( | ||
chat_history[-1].content | ||
input_files=await file_manager.sniff_and_extract_files_from_text( | ||
input_messages[-1].content | ||
), # TODO: make sure this is correct. | ||
output_files=file_manager.sniff_and_extract_files_from_text(output_message.content), | ||
output_files=[], | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. plugin不具备处理file ID的功能,所以不应该试图从 |
||
), | ||
new_messages, | ||
) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
考虑agent被并发调用的情况:
在以上代码中,用户可能希望向agent派发任务,而这些任务将被并发执行。此处存在race condition:由于agent是有状态的(带有memory),两个任务都执行完成后、乃至执行过程中agent的状态将与两个任务的实际执行顺序与时机有关。
为了解决这个问题,我们不妨为
Agent
类引入一个属性_is_running
,用这个属性来控制同一时刻agent只能执行一个任务。