-
Notifications
You must be signed in to change notification settings - Fork 52
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Agent][Feat] Ensure coroutine safety #282
base: develop
Are you sure you want to change the base?
[Agent][Feat] Ensure coroutine safety #282
Conversation
@@ -110,6 +108,7 @@ def __init__( | |||
if plugins is not None: | |||
raise NotImplementedError("The use of plugins is not supported yet.") | |||
self._init_file_needs_url() | |||
self._is_running = False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
考虑agent被并发调用的情况:
def create_agent_run_task(prompt):
return asyncio.create_task(agent.run(prompt))
create_agent_run_task(prompt1)
create_agent_run_task(prompt2)
在以上代码中,用户可能希望向agent派发任务,而这些任务将被并发执行。此处存在race condition:由于agent是有状态的(带有memory),两个任务都执行完成后、乃至执行过程中agent的状态将与两个任务的实际执行顺序与时机有关。
为了解决这个问题,我们不妨为Agent
类引入一个属性_is_running
,用这个属性来控制同一时刻agent只能执行一个任务。
@@ -251,10 +253,10 @@ async def _run_tool(self, tool: BaseTool, tool_args: str) -> ToolResponse: | |||
# XXX: Sniffing is less efficient and probably unnecessary. | |||
# Can we make a protocol to statically recognize file inputs and outputs | |||
# or can we have the tools introspect about this? | |||
input_files = file_manager.sniff_and_extract_files_from_list(list(parsed_tool_args.values())) | |||
input_files = await file_manager.sniff_and_extract_files_from_obj(parsed_tool_args) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
此处存在bug:sniff_and_extract_files_from_list
不能递归地找到parsed_tool_args
中可能存在的所有file,而只能侦测出位于top-level的file。将sniff_and_extract_files_from_list
修改为sniff_and_extract_files_from_obj
以解决这个问题。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这块我在 #292 中已经解决了,合入之后你这块 update 一下就行了。
input_files 和 output_files 这块都可能需要调整一下。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
嗯嗯
), # TODO: make sure this is correct. | ||
output_files=file_manager.sniff_and_extract_files_from_text(output_message.content), | ||
output_files=[], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
plugin不具备处理file ID的功能,所以不应该试图从output_message
中提取file ID。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
对于file manager而言,并发安全性更加值得考虑。这是因为我们为每个event loop提供一个全局的FileManager
对象作为默认使用的file manager,而全局对象往往更经常被并发使用。
在我们的设计中,对于一个FileManager
对象来说,每个file被分配唯一的file ID,因此并发调用FileManager
的创建file的方法(如create_file_from_path
)通常是并发安全的,不需要加锁。然而,FileManager
并不只有创建型的方法,还有prune
这样的清理file的方法以及look_up_file_by_id
这样的查询file的方法。这些方法均访问共享资源,有读操作也有写操作,并不是任意操作并发执行都能得到正确、有意义的结果。此外,retrieve_remote_file_by_id
方法在ID已存在于本地时应该调用失败并给出错误提示,也就是说,该方法需要缓存功能。
综上,为简单起见,也为了方便未来扩展,本PR将FileManager
中访问共享资源、可能互相冲突的公开方法全部锁住,保证这些方法只能串行执行。这显然是一个笨办法,降低了效率。但是,我们在供开发者阅读的文档中介绍FileManager
的用法时,实际上也假设了FileManager
的各个方法被串行地调用。因此,引入锁并没有改变我们对FileManager
预期的正确行为。
@@ -111,12 +113,22 @@ def __init__( | |||
# This can be done lazily, but we need to be careful about race conditions. | |||
self._temp_dir = self._create_temp_dir() | |||
self._save_dir = pathlib.Path(self._temp_dir.name) | |||
if not prune_on_close: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
当save_dir
为None
、而prune_on_close
为True
时,由于被创建为临时目录的self._save_dir
会被删除,因此对用户来说pruning-on-close还是在某种程度上进行了(本地的临时文件被移除)。这里对这一行为显式给出警告。
|
||
async def list_remote_files(self) -> List[RemoteFile]: | ||
self.ensure_not_closed() | ||
files = await self._get_remote_file_client().list_files() | ||
return files | ||
|
||
def look_up_file_by_id(self, file_id: str) -> File: | ||
async def look_up_file_by_id(self, file_id: str) -> File: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
由于锁的存在,该方法从sync变更为async。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
notes: 添加同步方法,同步方法中如果遇到look up正在被清理的文件,是不是可以返回None 给warning相关信息?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
已添加同步方法~不过warning这一点技术上似乎不是很容易做到,考虑到方法名称中已经有unsafe
,用户应当了解此方法的适用场景以及调用可能带来的后果(不安全),我觉得也可以考虑不加warning?
assert_never() | ||
self._file_registry.unregister_file(file) | ||
|
||
async def _sniff_and_extract_files_from_obj(self, obj: object, *, recursive: bool = True) -> List[File]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Python asyncio
标准库暂不支持可重入锁,为了避免死锁,为可能被递归执行的方法添加无锁的内部实现。
def _get_default_file_type(self) -> Literal["local", "remote"]: | ||
if self._remote_file_client is not None: | ||
return "remote" | ||
else: | ||
return "local" | ||
|
||
def _get_unique_file_path( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
使用更安全、高效的标准库方法tempfile.mkstemp
代替手动实现的unique file创建方式。
erniebot-agent
提供异步API,但部分关键组件尚未充分考虑并发安全性,在被并发使用时可能出现错误。本PR旨在对erniebot-agent
进行并发控制。具体而言,本PR考虑agent与file manager组件在并发场景可能遇到的问题,通过加锁等手段减小竞态条件等问题造成的影响。具体的讨论点见comment。