Autonomous Agents
Autonomous Robots +r +w +x
自主性是指从模糊目标生成具体目标的过程。
Autonomy refers to the process of generating specific goals from a vague objective.
但是,机器人将会吃分别善恶知识树的果子吗?还是已经吃了呢? 它是否有一天会说:我有一个比最终目标更好的目标?
But will the robot eat the fruit of the tree of the knowledge of good and evil? Or has it already eaten? Would it ever say: "I have a better goal than the ultimate goal?"
我们在建造一个自主调度器。这个系统由两个代理组成:一个是更新 DAG 的代理;另一个是执行任务的代理。
采取以下步骤:
- 用户设定一个最终目标,也就是设定最初的 DAG 中末终节点的预期结果。这个最初的 DAG 只包含起初节点和末终节点并且当前所在的节点是起初节点。
- 系统将这个 DAG 通过 OpenAI 的 API 提示给更新 DAG 的代理;提示更新 DAG 的代理根据最终目标调整任务节点(添加、排除任务节点)来更新这个 DAG,并重置当前所在的节点为第一个未完成的任务节点。更新了的 DAG 通过 OpenAI 的 API 被返回给系统。
- 系统通过 Python 对这个 DAG 进行拓扑排序并将第一个未完成的任务交给执行任务的代理去完成。系统根据这个任务的执行结果更新 DAG 中相应的任务节点。
- 系统循环执行步骤 2 至 3 直到当前所在的节点为终末节点。
用 Python 调用 OpenAI 的 API 实现更新 DAG 的代理。用 DAGL 实现执行任务的代理。
We want to build an autonomous scheduler consisting of two agents: one updates the DAG and another executes tasks.
The following steps are taken:
- The user sets a final goal, which is the expected result of the end node in the initial DAG. This initial DAG contains only the starting node and the end node, and the current node is pointed to the starting node.
- The system prompts the DAG to the agent that updates the DAG through OpenAI's API. The agent updates the task nodes based on the final goal to update the DAG and resets the current node pointer to the first unfinished task. And then returns the updated DAG to the system through OpenAI's API.
- The system performs a topological sort on the DAG using Python and hands the first unfinished task to the agent that executes tasks to complete. The system updates the corresponding task node in the DAG based on the execution result.
- The system loops through steps 2-3 until the current node is the end node.
Python is used to call OpenAI's API to implement the agent that updates the DAG, and DAGL is used to implement the agent that executes tasks.
For simplicity, parallel task execution is not considered at this moment.
我们在建造一个自主调度器。这个系统由两个代理组成:一个是更新 DAG 的代理;另一个是执行任务的代理。
采取以下步骤:
- 用户设定一个最终目标,也就是设定最初的 DAG 中末终节点的预期结果。这个最初的 DAG 只包含起初节点和末终节点。
- 系统将这个 DAG 通过 OpenAI 的 API 提示给更新 DAG 的代理;提示更新 DAG 的代理根据最终目标添加任务节点来更新这个 DAG。更新了的 DAG 通过 OpenAI 的 API 被返回给系统。
- 系统将这个 DAG 交给执行任务的代理 DigDag 去完成。系统根据 DigDag 的执行结果更新 DAG 中相应的任务节点及其状态(成功、失败、未执行)。
- 系统循环执行步骤 2 至 3 直到终末节点的状态为成功。
用 Python 调用 OpenAI 的 API 实现更新 DAG 的代理。用 DigDag 实现执行任务的代理。
利用每次添加的任务节点的执行结果与预期结果的相似度对更新 DAG 的代理所采用的模型进行精调。
We are building an autonomous scheduler that consists of two agents: one is the DAG updater agent and the other is the task executor agent.
The following steps are taken:
- The user sets a final goal, which is the expected result of the final node in the initial DAG. This initial DAG only contains the starting node and ending node.
- The system prompts the DAG updater agent with this DAG through OpenAI's API. The DAG updater agent updates the DAG by adding task nodes according to the final goal. The updated DAG is returned to the system through OpenAI's API.
- The system hands this DAG to the task executor agent DigDag to complete. The system updates the corresponding task nodes and their states (successful, failed, not executed) in the DAG based on the execution results of DigDag.
- The system repeats steps 2 to 3 until the state of the ending node is successful.
Python is used to call OpenAI's API to implement the DAG updater agent, and DigDag is used to implement the task executor agent.
The model used by the DAG updater agent is fine-tuned based on the similarity between the execution results of each added task node and the expected result.
我们的终极目标是「……」。
用于反思的提示:
「为了达成终极目标,请你制作一个 DAG 。」
- 提示现在的 DAG
- 没有已完成的任务
- 没有安排任何任务节点
- 只有起初节点和末终节点
- 当前所在的节点是起初节点
「从当前的情况出发,我们如何达成终极目标?请给出更新后的 DAG 。」
- 提示现在的 DAG
- 已完成任务的输出
- 当前所在的节点层次
我们用 DAG 描述所有任务的完成情况和依赖关系。我们的 DAG 中包含如下三种节点:
- 起初节点:这是 DAG 中最初的节点。
- 末终节点:这是 DAG 中最终的节点。
- 任务节点:为达成终极目标,我们应该完成哪些任务?任务必须有可操作性。按照「任务名称:任务输入\n任务输出的预见」的格式进行描述。「任务名称:任务输入」只能是「搜索:主题」、「访问:链接」。
Our ultimate goal is "...".
Prompts for reflection:
- "To achieve our ultimate goal, please create a DAG."
- The current DAG consists of:
- No completed tasks
- No task nodes have been scheduled
- Only the initial node and the terminal node are present
- The current node is the initial node
- The current DAG consists of:
- "Starting from the current situation, how can we achieve the ultimate goal? Please provide an updated DAG."
- The current DAG consists of:
- Output of completed tasks
- Current node hierarchy
- The current DAG consists of:
We use DAG to describe the completion status and dependency relationships of all tasks. Our DAG contains the following three types of nodes:
- Initial node: This is the first node in the DAG.
- Terminal node: These are the final nodes in the DAG.
- Task node: What tasks should we complete to achieve the ultimate goal? Tasks must be actionable. Describe them in the format of "Task Name: Task Input and Task Output Forecast". "Task Name: Task Input" can only be "Search: Topic" or "Visit: Link".
计划评审技术是一种基于有向无环图的计划排定技术,通常用于组织大型的人工项目。在计划评审技术中,每个节点表示项目的一个里程碑,每条有向边表示任务或者活动,连接着表示任务开始或结束的两个节点。每条边则被标注上预估需时。图中的最长路径即为项目的关键路径。关键路径决定了项目所需的总时间,里程碑的完成时间取决于结束于本节点的最长路径。
A somewhat different DAG-based formulation of scheduling constraints is used by the program evaluation and review technique (PERT), a method for management of large human projects that was one of the first applications of DAGs. In this method, the vertices of a DAG represent milestones of a project rather than specific tasks to be performed. Instead, a task or activity is represented by an edge of a DAG, connecting two milestones that mark the beginning and completion of the task. Each such edge is labeled with an estimate for the amount of time that it will take a team of workers to perform the task. The longest path in this DAG represents the critical path of the project, the one that controls the total time for the project. Individual milestones can be scheduled according to the lengths of the longest paths ending at their vertices.
我们在建造一个自主调度器。这个系统由两个代理组成:一个是更新 PERT 图的代理;另一个是执行任务的代理。
采取以下步骤:
- 用户设定一个最终目标,也就是设定最初的 PERT 图中末终节点的预期结果。这个最初的 PERT 图只包含起初节点和末终节点。
- 系统将这个 PERT 图通过 OpenAI 的 API 提示给更新 PERT 图的代理;提示更新 PERT 图的代理根据最终目标添加里程碑节点和任务有向边来更新这个 PERT 图。更新了的 PERT 图通过 OpenAI 的 API 被返回给系统。
- 系统将这个 PERT 图交给执行任务的代理 DigDag 去完成。系统根据 DigDag 的执行结果更新 PERT 图中相应的里程碑节点及其状态(达成、未达成)。
- 系统循环执行步骤 2 至 3 直到终末节点的状态为达成。
用 Python 调用 OpenAI 的 API 实现更新 PERT 图的代理。用 DigDag 实现执行任务的代理。
利用每次添加的任务有向边的执行结果与预期结果的相似度对更新 PERT 图的代理所采用的模型进行精调。
We are building an autonomous scheduler that consists of two agents: one is the PERT chart updater agent and the other is the task executor agent.
The following steps are taken:
- The user sets a final goal, which is the expected result of the final node in the initial PERT chart. This initial PERT chart only contains the Starting Node and Ending Node.
- The system prompts the PERT chart updater agent with this PERT chart through OpenAI's API. The PERT chart updater agent updates the PERT chart by adding Milestone Nodes and Task Edges according to the final goal. The updated PERT chart is returned to the system through OpenAI's API.
- The system hands this PERT chart to the task executor agent DigDag to complete. The system updates the corresponding Milestone Nodes and their states (achieved, unachieved) in the PERT chart based on the execution results of DigDag.
- The system repeats steps 2 to 3 until the state of the Ending Node is achieved.
Python is used to call OpenAI's API to implement the PERT chart updater agent, and DigDag is used to implement the task executor agent.
The model used by the PERT chart updater agent is fine-tuned based on the similarity between the execution results of each added task edge and the expected result.
私たちは自律的なスケジューラを構築しています。このシステムは、2つの Agent で構成されており、1つは PERT 図を更新する Agent であり、もう1つは、タスクを実行する Agent です。
次の手順が実行されます。
- ユーザーは最終目標を設定します。これは、最初的な PERT 図にの Ending Node の期待される結果を設定することです。この最初的な PERT 図には、Starting Node と Ending Node のみが含まれています。
- システムは、OpenAI の API を通じて PERT 図を更新する Agent に PERT 図をプロンプトします。PERT 図を更新する Agent は、最終目標に従って Milestone Nodes と Task Edges を追加することで PERT 図を更新します。更新された PERT 図は、OpenAI の API を通じてシステムに返されます。
- システムは、タスクを実行する Agent である DigDag に PERT 図を渡し、タスクを完了します。システムは、DigDag の実行結果に従って、PERT 図内の対応する Milestone Nodes とその状態(達成、未到達)を更新します。
- システムは、Ending Node の期待される結果に達するまで、ステップ 2 ~ 3 をループします。
Python を運用して OpenAI の API を呼び出し、PERT 図を更新するための Agent を実装します。DigDag を運用して、タスクを実行する Agent を実装します。
PERT 図を更新する Agent に運用されるモデルは、追加された各 Task Edge の実行結果と期待される結果の間の類似性を利用して、ファインチューニングされます。
import asyncio
asyncio.all_tasks()
%%bash
python << EOF
import asyncio
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
print(loop == asyncio.get_event_loop())
EOF
%%bash
python << EOF
import asyncio
async def say(word):
print(word)
print(asyncio.all_tasks())
asyncio.get_running_loop().stop()
asyncio.get_event_loop().create_task(say('hello'))
print('start run_forever')
asyncio.get_event_loop().run_forever()
print('end run_forever')
asyncio.get_event_loop().close()
EOF
%%bash
python << EOF
import asyncio
async def say(word):
print(word)
print(asyncio.all_tasks())
asyncio.get_running_loop().stop()
loop = asyncio.new_event_loop()
loop.create_task(say('hello'))
print('start run_forever')
loop.run_forever()
print('end run_forever')
loop.close()
# asyncio.set_event_loop(asyncio.new_event_loop())
asyncio.get_event_loop().create_task(say('world'))
print('start run_forever')
asyncio.get_event_loop().run_forever()
print('end run_forever')
asyncio.get_event_loop().close()
EOF
%%bash
python3 << EOF
import asyncio
async def child_task(name):
print(f'Starting {name}')
await asyncio.sleep(2) # 模拟耗时操作
print(f'Finished {name}')
return name
async def main():
# 定义多个子任务
tasks = [
child_task('Task 1'),
child_task('Task 2'),
child_task('Task 3')
]
# 并行运行多个子任务
try:
results = await asyncio.gather(*tasks)
print('All tasks completed successfully:', results)
except asyncio.CancelledError:
print('Task was cancelled')
# 创建事件循环
loop = asyncio.get_event_loop()
# 创建主任务
main_task = loop.create_task(main())
# 运行事件循环
try:
loop.run_until_complete(main_task)
except KeyboardInterrupt:
# 如果按下 Ctrl+C,取消主任务
main_task.cancel()
loop.run_until_complete(main_task)
finally:
loop.close()
EOF
%%bash
python3 << EOF
import asyncio
async def child_task(name):
try:
print(f'Starting {name}')
await asyncio.sleep(2) # 模拟耗时操作
print(f'Finished {name}')
return name
except asyncio.CancelledError:
print(f'{name} was cancelled')
async def main():
# 定义多个子任务
tasks = [
child_task('Task 1'),
child_task('Task 2'),
child_task('Task 3')
]
# 并行运行多个子任务
try:
results = await asyncio.gather(*tasks)
print('All tasks completed successfully:', results)
except asyncio.CancelledError:
print('Main task was cancelled')
# 创建事件循环
loop = asyncio.get_event_loop()
# 创建主任务
main_task = loop.create_task(main())
# 运行事件循环
try:
loop.run_until_complete(main_task)
except KeyboardInterrupt:
# 如果按下 Ctrl+C,取消主任务
main_task.cancel()
loop.run_until_complete(main_task)
finally:
loop.close()
EOF
%%bash
python3 << EOF
import asyncio
asyncio.get_event_loop().run_until_complete(asyncio.sleep(0))
EOF
import asyncio
async def child_task(name):
try:
print(f'Starting {name}')
await asyncio.sleep(3600) # 模拟耗时操作
print(f'Finished {name}')
return name
except asyncio.CancelledError:
print(f'{name} was cancelled')
raise asyncio.CancelledError('child_task(name)')
async def main():
# 定义多个子任务
tasks = [
asyncio.create_task(child_task('Task 1')),
asyncio.create_task(child_task('Task 2')),
asyncio.create_task(child_task('Task 3'))
]
# 并行运行多个子任务
try:
results = await asyncio.gather(*tasks, return_exceptions=True)
print('All tasks completed successfully:', results)
except asyncio.CancelledError as e:
print(f'Main task was cancelled: {str(e)}')
print(tasks[0].cancelled())
try:
print(tasks[0].result())
except asyncio.CancelledError as e:
print(str(e.args))
if not tasks[0].cancelled():
print(tasks[0].result())
print(tasks[1].cancelled())
if not tasks[1].cancelled():
print(tasks[1].result())
print(tasks[2].cancelled())
if not tasks[2].cancelled():
print(tasks[2].result())
raise
# print(results)
# 创建主任务
main_task = asyncio.create_task(main())
await asyncio.sleep(2)
main_task.cancel()
# await main_task
import asyncio, time
async def task_1():
print('Executing Task 1')
await asyncio.sleep(1)
print('Task 1 done')
async def task_2():
print('Executing Task 2')
await asyncio.sleep(1)
print('Task 2 done')
async def task_5():
print('Executing Task 5')
await asyncio.gather(task_1(), task_2())
await asyncio.sleep(1)
print('Task 5 done')
async def task_3():
print('Executing Task 3')
await asyncio.sleep(1)
print('Task 3 done')
async def task_4():
print('Executing Task 4')
await asyncio.sleep(1)
print('Task 4 done')
async def task_6():
print('Executing Task 6')
await asyncio.gather(task_3(), task_4())
await asyncio.sleep(1)
print('Task 6 done')
async def task_7():
print('Executing Task 7')
# 并行执行 Task 5、Task 6
await asyncio.gather(task_5(), task_6())
await asyncio.sleep(1)
print('Task 7 done')
start_time = time.time()
await task_7()
end_time = time.time()
print('Elapsed Time:', end_time - start_time, 'seconds')
import asyncio
async def cancel_me():
print('cancel_me(): before sleep')
try:
return 'cancle_me(): return'
# Wait for 1 hour
await asyncio.sleep(3600)
except asyncio.CancelledError:
print('cancel_me(): cancel sleep')
raise
finally:
print('cancel_me(): after sleep')
return 'cancle_me(): return'
async def cancel_me_again():
print('cancel_me_again(): before sleep')
try:
# Wait for 1 hour
await asyncio.sleep(3600)
except asyncio.CancelledError:
print('cancel_me_again(): cancel sleep')
raise
finally:
print('cancel_me_again(): after sleep')
return 'cancel_me_again(): return'
async def main():
# Create a "cancel_me" Task
task = asyncio.create_task(cancel_me())
# Wait for 1 second
await asyncio.sleep(1)
task.cancel()
try:
await task
except asyncio.CancelledError:
print("main(): cancel_me is cancelled now")
print(task.cancelled())
if not task.cancelled():
print(task.result())
# Create a "cancel_me_again" Task
task = asyncio.create_task(cancel_me_again())
# Wait for 1 second
await asyncio.sleep(1)
task.cancel()
try:
await task
except asyncio.CancelledError:
print("main(): cancel_me_again is cancelled now")
print(task.cancelled())
if not task.cancelled():
print(task.result())
# Create a "cancel_me_again" Task
task = asyncio.create_task(cancel_me_again())
async def cancel(task):
task.cancel()
asyncio.create_task(cancel(task))
await asyncio.sleep(1)
try:
await task
except asyncio.CancelledError:
print("main(): cancel_me_again is cancelled now")
print(task.cancelled())
if not task.cancelled():
print(task.result())
# Create a "cancel_me_again" Task
task = asyncio.create_task(cancel_me_again())
# Wait for 1 second
# await asyncio.sleep(1)
task.cancel()
try:
await task
except asyncio.CancelledError:
print("main(): cancel_me_again is cancelled now")
print(task.cancelled())
if not task.cancelled():
print(task.result())
await main()
def raise_base_exception():
message = "This is a BaseException with a message"
raise BaseException(message)
try:
raise_base_exception()
except BaseException as e:
print(f"Caught a BaseException: {e}")
print(f"Error message: {str(e)}")
import asyncio
def raise_base_exception():
message = "This is a CancelledError with a message"
raise asyncio.CancelledError(message)
try:
raise_base_exception()
except BaseException as e:
print(f"Caught a CancelledError: {e}")
print(f"Error message: {str(e)}")
issubclass(Exception, BaseException)
import weakref
class MyClass:
def __init__(self, value):
self.value = value
# 创建对象
obj = MyClass(42)
# 创建弱引用
weak_ref = weakref.ref(obj)
# 获取弱引用指向的对象
result = weak_ref()
print("Object from weak reference:", result)
# 删除原始对象引用
del obj
# 尝试再次获取弱引用指向的对象
result_after_deletion = weak_ref()
print("Object from weak reference after deletion:", result_after_deletion)
# 尝试再次获取弱引用指向的对象
result_after_deletion = weak_ref()
print("Object from weak reference after deletion:", result_after_deletion)
import asyncio
class CancelledTask(asyncio.CancelledError):
pass
async def cancel_me():
print('cancel_me(): before sleep')
try:
# Wait for 1 hour
await asyncio.sleep(3600)
except asyncio.CancelledError:
print('cancel_me(): cancel sleep')
# raise BaseException('cancel_me()')
# raise asyncio.CancelledError('cancel_me()')
raise CancelledTask('cancel_me()')
finally:
print('cancel_me(): after sleep')
# return 'cancle_me(): return'
async def main():
# Create a "cancel_me" Task
task = asyncio.create_task(cancel_me())
# Wait for 1 second
await asyncio.sleep(1)
# Wait for 2 second
# await asyncio.sleep(2)
try:
await asyncio.shield(task)
print("main(): cancel_me is cancelled")
print(task.cancelled())
if not task.cancelled():
print(task.result())
# except BaseException as e:
except asyncio.CancelledError as e:
# except CancelledTask as e:
print(f'main(): cancel_me is cancelled now. e={str(e)} {e.args}')
task.cancel()
print(task.done())
print(task.cancelled())
if task.done() and not task.cancelled():
print(task.result())
try:
print(task.result())
except BaseException as e:
# except asyncio.CancelledError as e:
# except CancelledTask as e:
print(f'e={str(e)} {e.args}')
main_task = asyncio.create_task(main())
# Wait for 2 second
await asyncio.sleep(2)
# main_task.cancelling()
main_task.cancel()
await main_task
# main_task.cancelling()
import asyncio
async def task_1():
print('task_1()')
async def task_2():
print('task_2()')
async def main():
# Create a "task_1" Task
asyncio.create_task(task_1())
print('main(): before await')
await task_2()
print('main(): after await')
await main()
print(main == main)
main_1 = main()
main_2 = main()
print(type(main_1))
print(type(main_2))
print(main_1 == main_2)
await main_1
await main_2
# await main_1
# await asyncio.create_task(main_2)
import asyncio
async def task_1():
print('task_1()')
return 'task_1()'
async def task_2():
print('task_2()')
async def main():
# Create a "task_1" Task
task = asyncio.create_task(task_1())
print('main(): before await')
print(f'main(): {await task}')
print('main(): after await')
print('main(): before await')
await task_2()
print('main(): after await')
print('main(): before await')
print(f'main(): {await task}')
print('main(): after await')
await main()
import asyncio
async def task_1():
print('task_1()')
raise Exception
async def task_2():
print('task_2()')
async def main():
print('main(): before await')
try:
print(f'main(): {await asyncio.gather(task_1(), task_2())}')
except Exception as e:
print(f'e="{e}" e.args={e.args}')
print('main(): after await')
await main()
import asyncio
async def task_1():
print('task_1()')
raise Exception
async def task_2():
print('task_2()')
async def main():
print('main(): before await')
try:
async with asyncio.TaskGroup() as g:
t_1 = g.create_task(task_1())
t_2 = g.create_task(task_2())
print(f'main(): t_1 = {t_1.result()} t_2 = {t_2.result()}')
except Exception as e:
print(f'e="{e}" e.args={e.args}')
print('main(): after await')
await main()
import asyncio
async def cancel_me():
print('cancel_me(): before sleep')
try:
# Wait for 1 hour
await asyncio.sleep(3600)
except asyncio.CancelledError:
print('cancel_me(): cancel sleep')
raise
finally:
print('cancel_me(): after sleep')
async def main():
# Create a "cancel_me" Task
task = asyncio.create_task(cancel_me())
async def cancel(task):
task.cancel()
asyncio.create_task(cancel(task))
await asyncio.sleep(1)
try:
await task
except asyncio.CancelledError:
print("main(): cancel_me is cancelled now")
print(task.cancelled())
if not task.cancelled():
print(task.result())
await main()
import asyncio
async def cancel_me():
print('cancel_me(): before sleep')
try:
# Wait for 1 second
await asyncio.sleep(1)
except asyncio.CancelledError:
print('cancel_me(): cancel sleep')
raise
finally:
print('cancel_me(): after sleep')
# Create a "cancel_me" Task
task_cancel_me = asyncio.create_task(cancel_me())
async def task_1():
try:
await task_cancel_me
except asyncio.CancelledError:
print('task_1(): task_1 is cancelled now')
for task in asyncio.all_tasks():
if task.get_name() == 'task_2' and 'task_2' in repr(task.get_coro()):
task.cancel()
async def task_2():
try:
await task_cancel_me
except asyncio.CancelledError:
print('task_2(): task_2 is cancelled now')
for task in asyncio.all_tasks():
if task.get_name() == 'task_1' and 'task_1' in repr(task.get_coro()):
task.cancel()
await asyncio.gather(
asyncio.create_task(task_1(), name='task_1'),
asyncio.create_task(task_2(), name='task_2'),
)
import asyncio
async def cancel_me():
print('cancel_me(): before sleep')
try:
# Wait for 5 second
await asyncio.sleep(5)
except asyncio.CancelledError:
print('cancel_me(): cancel sleep')
raise
finally:
print('cancel_me(): after sleep')
# Create a "cancel_me" Task
task_cancel_me = asyncio.create_task(cancel_me())
async def task_1():
try:
await task_cancel_me
except asyncio.CancelledError:
print('task_1(): task_1 is cancelled now')
async def task_2():
try:
await task_cancel_me
except asyncio.CancelledError:
print('task_2(): task_2 is cancelled now')
task_a = asyncio.create_task(task_1(), name='task_1')
task_b = asyncio.create_task(task_2(), name='task_2')
await asyncio.sleep(1)
task_a.cancel()
await task_b
%%bash
pip install aiomonitor
import asyncio
import aiomonitor
async def task1():
print("Task 1 started")
await asyncio.sleep(1)
print("Task 1 finished")
async def task2():
print("Task 2 started")
await asyncio.sleep(2)
print("Task 2 finished")
async def main():
with aiomonitor.start_monitor(asyncio.get_event_loop(), hook_task_factory=True):
await asyncio.gather(task1(), task2())
await asyncio.sleep(3600)
# help(aiomonitor)
await main()
%%bash
pip install matplotlib
import asyncio
import networkx as nx
import matplotlib.pyplot as plt
async def task1():
await asyncio.sleep(1)
async def task2():
await asyncio.sleep(1)
async def task3():
await asyncio.sleep(1)
async def main():
await asyncio.gather(task1(), task2())
await task3()
# 创建 DAG
G = nx.DiGraph()
G.add_nodes_from(["task1", "task2", "task3", 'main'])
G.add_edges_from([("task1", "main"), ("task2", "main"), ("main", "task3")])
# 绘制 DAG
pos = nx.spring_layout(G)
nx.draw_networkx_nodes(G, pos, node_color=['yellow', 'skyblue', 'red', 'orange'])
# nx.draw_networkx_nodes(G, pos, node_size=1200, node_color=['yellow', 'green', 'red', 'orange'])
nx.draw_networkx_edges(G, pos)
nx.draw_networkx_labels(G, pos, font_size=5)
plt.show()
%%bash
sudo apt-get install graphviz libgraphviz-dev
pip install pygraphviz
import networkx as nx
import matplotlib.pyplot as plt
# 创建 DAG
G = nx.DiGraph()
G.add_nodes_from(["task1", "task2", "task3"])
G.add_edges_from([("task1", "main"), ("task2", "main"), ("main", "task3")])
pos = nx.nx_agraph.pygraphviz_layout(G)
nx.draw_networkx_nodes(G, pos)
nx.draw_networkx_edges(G, pos)
nx.draw_networkx_labels(G, pos)
plt.show()
%%bash
pip install pydot
import networkx as nx
import matplotlib.pyplot as plt
# 创建 DAG
G = nx.DiGraph()
G.add_nodes_from(["task1", "task2", "task3"])
G.add_edges_from([("task1", "main"), ("task2", "main"), ("main", "task3")])
pos = nx.nx_pydot.pydot_layout(G)
nx.draw_networkx_nodes(G, pos)
nx.draw_networkx_edges(G, pos)
nx.draw_networkx_labels(G, pos)
plt.show()
import networkx as nx
import matplotlib.pyplot as plt
# 创建 DAG
G = nx.DiGraph()
G.add_nodes_from(["task1", "task2", "task3"])
G.add_edges_from([("task1", "main"), ("task2", "main"), ("main", "task3")])
drawing = nx.drawing.nx_pydot.to_pydot(G)
png_str = drawing.create_png()
from io import BytesIO
sio = BytesIO()
sio.write(png_str)
sio.seek(0)
import matplotlib.image as mpimg
img = mpimg.imread(sio)
plt.imshow(img)
plt.show()
import networkx as nx
import matplotlib.pyplot as plt
# 创建有向图
G = nx.DiGraph()
# 添加节点
nodes = ["starting", "get_agv_at_f1", "get_ev", "get_agv_at_f2", "ship_to_agv",
"ship_to_agv_and_move", "meet_floor", "ship_to_ev", "ship_to_ev_and_move",
"meet_ev", "ship_to_agv_via_ev", "ship_to_shelf"]
G.add_nodes_from(nodes)
# 添加边
edges = [("starting", "get_agv_at_f1"), ("starting", "get_ev"), ("starting", "get_agv_at_f2"),
("get_agv_at_f1", "ship_to_agv"), ("ship_to_agv", "ship_to_agv_and_move"), ("get_ev", "ship_to_agv_and_move"),
("get_ev", "meet_floor"), ("ship_to_agv_and_move", "ship_to_ev"), ("meet_floor", "ship_to_ev"),
("ship_to_ev", "ship_to_ev_and_move"), ("get_ev", "meet_ev"), ("get_agv_at_f2", "meet_ev"),
("ship_to_ev_and_move", "ship_to_agv_via_ev"), ("meet_ev", "ship_to_agv_via_ev"),
("ship_to_agv_via_ev", "ship_to_shelf")]
G.add_edges_from(edges)
# 绘制图形
pos = nx.spring_layout(G, seed=42)
nx.draw(G, pos, with_labels=True, node_size=2000, node_color="skyblue", font_size=10, font_color="black", font_weight="bold", arrowsize=20)
plt.show()
import logging
# 设置日志级别为 DEBUG
logging.basicConfig(level=logging.DEBUG)
def example_function():
logging.info("This is an info message from example_function.")
def another_function():
logging.warning("This is a warning message from another_function.")
if __name__ == "__main__":
# 设置日志格式,包含函数名
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(funcName)s - %(message)s')
# 获取根日志记录器
root_logger = logging.getLogger()
# 创建处理程序并将格式器添加到它
handler = logging.StreamHandler()
handler.setFormatter(formatter)
# 将处理程序添加到根记录器
root_logger.addHandler(handler)
# 调用函数
example_function()
another_function()
import asyncio
from aiohttp import web
async def handle_request(request):
print("Received request:")
print(request.method, request.path)
print("Headers:")
for header, value in request.headers.items():
print(f"{header}: {value}")
print("Body:")
body = await request.text()
print(body)
return web.Response(text="Received request")
async def main():
# 创建 HTTP 服务器
app = web.Application()
app.add_routes([web.get('/', handle_request)])
# 启动 HTTP 服务器
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, 'localhost', 8080)
await site.start()
print("HTTP server started at http://localhost:8080")
# 等待一次请求
await asyncio.sleep(3600) # 等待一小时
await runner.cleanup() # 关闭服务器
if __name__ == "__main__":
asyncio.run(main())
import asyncio
from pymodbus.client.asynchronous import AsyncModbusTCPClient
async def read_modbus_data(host, port, unit_id, address, count):
async with AsyncModbusTCPClient(host=host, port=port) as client:
result = await client.read_holding_registers(address, count, unit=unit_id)
if result.isError():
print(f"Failed to read data: {result}")
else:
print(f"Read data: {result.registers}")
async def main():
host = '127.0.0.1'
port = 502
unit_id = 1
address = 0
count = 5
await read_modbus_data(host, port, unit_id, address, count)
if __name__ == "__main__":
asyncio.run(main())
from pymodbus.client.asynchronous import AsyncModbusTCPClient
from pymodbus.payload import BinaryPayloadDecoder
from pymodbus.constants import Endian
async def read_modbus_data():
async with AsyncModbusTCPClient('localhost', port=502) as client:
# 读取保持寄存器中的数据
result = await client.read_holding_registers(0, 2, unit=1)
if not result.isError():
# 处理读取的数据
decoder = BinaryPayloadDecoder.fromRegisters(result.registers, byteorder=Endian.Big)
value1 = decoder.decode_32bit_float()
value2 = decoder.decode_32bit_float()
print(f"Value 1: {value1}, Value 2: {value2}")
else:
print(f"Error reading Modbus data: {result}")
# 在异步环境中运行示例
import asyncio
asyncio.run(read_modbus_data())
%%bash
pip install prefect
from prefect import flow, task
@task(name='task_1')
def task_1():
print('task_1')
@task(name='task_2')
def task_2():
print('task_2')
@task(name='task_5')
def task_5():
submit_1 = task_1.submit()
submit_2 = task_2.submit()
submit_1.result()
submit_2.result()
print('task_5')
@task(name='task_3')
def task_3():
print('task_3')
@task(name='task_4')
def task_4():
print('task_4')
@task(name='task_6')
def task_6():
submit_3 = task_3.submit()
submit_4 = task_4.submit()
submit_3.result()
submit_4.result()
print('task_6')
@flow
def task_7():
submit_5 = task_5.submit()
submit_6 = task_6.submit()
submit_5.result()
submit_6.result()
print('task_7')
task_7()
import asyncio, time
from prefect import flow
@flow(flow_run_name='task_1')
async def task_1():
print('Executing Task 1')
await asyncio.sleep(1)
print('Task 1 done')
@flow(flow_run_name='task_2')
async def task_2():
print('Executing Task 2')
await asyncio.sleep(1)
print('Task 2 done')
@flow(flow_run_name='task_5')
async def task_5():
print('Executing Task 5')
await asyncio.gather(task_1(), task_2())
await asyncio.sleep(1)
print('Task 5 done')
@flow(flow_run_name='task_3')
async def task_3():
print('Executing Task 3')
await asyncio.sleep(1)
print('Task 3 done')
@flow(flow_run_name='task_4')
async def task_4():
print('Executing Task 4')
await asyncio.sleep(1)
print('Task 4 done')
@flow(flow_run_name='task_6')
async def task_6():
print('Executing Task 6')
await asyncio.gather(task_3(), task_4())
await asyncio.sleep(1)
print('Task 6 done')
@flow(flow_run_name='task_7')
async def task_7():
print('Executing Task 7')
# 并行执行 Task 5、Task 6
await asyncio.gather(task_5(), task_6())
await asyncio.sleep(1)
print('Task 7 done')
start_time = time.time()
await task_7()
end_time = time.time()
print('Elapsed Time:', end_time - start_time, 'seconds')
%%bash
cd ~ && git clone https://github.com/lightaime/camel.git
cd ~/camel && pip install -e .
import time
from colorama import Fore
def print_text_animated(text):
for char in text:
print(char, end='', flush=True)
time.sleep(0.02)
from camel.agents import RolePlaying
def test_camel(mock_openai):
task_prompt = 'Design a custom game using pygame'
print(Fore.YELLOW + f'Original task prompt:\n{task_prompt}\n')
role_play_session = RolePlaying('Computer Programmer', 'Gamer', task_prompt)
print(Fore.CYAN + f'Specified task prompt:\n{role_play_session.task_prompt}\n')
chat_turn_limit, n = 10000, 0
assistant_msg, _ = role_play_session.init_chat()
while n < chat_turn_limit:
n += 1
(assistant_msg, _, _), (user_msg, _, _) = role_play_session.step(assistant_msg)
print_text_animated(Fore.BLUE + f'AI User:\n\n{user_msg.content}\n\n')
print_text_animated(Fore.GREEN + f'AI Assistant:\n\n{assistant_msg.content}\n\n')
if '<CAMEL_TASK_DONE>' in user_msg.content:
break
from ipymock import do
import ipymock.browser
ipymock.browser.common.chat_gpt_base_url = 'http://127.0.0.1:8080'
do(
mock_openai = ipymock.browser.mock_openai,
test_camel = test_camel,
)
Generative Agents: Interactive Simulacra of Human Behavior
Believable proxies of human behavior can empower interactive applications ranging from immersive environments to rehearsal spaces for interpersonal communication to prototyping tools. In this paper, we introduce generative agents--computational software agents that simulate believable human behavior. Generative agents wake up, cook breakfast, and head to work; artists paint, while authors write; they form opinions, notice each other, and initiate conversations; they remember and reflect on days past as they plan the next day. To enable generative agents, we describe an architecture that extends a large language model to store a complete record of the agent's experiences using natural language, synthesize those memories over time into higher-level reflections, and retrieve them dynamically to plan behavior. We instantiate generative agents to populate an interactive sandbox environment inspired by The Sims, where end users can interact with a small town of twenty five agents using natural language. In an evaluation, these generative agents produce believable individual and emergent social behaviors: for example, starting with only a single user-specified notion that one agent wants to throw a Valentine's Day party, the agents autonomously spread invitations to the party over the next two days, make new acquaintances, ask each other out on dates to the party, and coordinate to show up for the party together at the right time. We demonstrate through ablation that the components of our agent architecture--observation, planning, and reflection--each contribute critically to the believability of agent behavior. By fusing large language models with computational, interactive agents, this work introduces architectural and interaction patterns for enabling believable simulations of human behavior.
《生成智能体:人类行为交互模拟器》
可信的人类行为模拟器可以赋能从沉浸式环境到人际交流排练场到原型工具的交互应用。本文介绍了生成智能体——模拟可信人类行为的计算机软件智能体。生成智能体可以起床、做早餐、去上班,艺术家可以画画,作家可以写作;他们形成观点,互相注意,发起对话;他们记得和反思过去的日子,并在计划未来的日子时检索这些记忆。为了实现生成智能体,我们描述了一种架构,将一个大型语言模型扩展为使用自然语言存储智能体的完整经历记录,随着时间的推移将这些记忆综合为更高层次的反思,并动态检索它们来规划行为。我们实例化生成智能体以填充由《模拟人生》启发的交互沙盒环境,终端用户可以使用自然语言与25个智能体的小镇进行交互。在评估中,这些生成智能体产生可信的个体和紧急的社交行为:例如,从一个单一的用户指定想要举办情人节聚会的概念开始,智能体自主地在接下来的两天里传播聚会的邀请,结交新朋友,互相邀请参加聚会,协调好在正确的时间一起出现在聚会上。我们通过消融实验证明,智能体架构的组成部分——观察、规划和反思——各自对智能体行为的可信度做出了重要贡献。通过将大型语言模型与计算机交互智能体融合,这项工作引入了可信的人类行为模拟的新架构和交互模式。
%%bash
cd ~ && git clone https://github.com/yoheinakajima/babyagi
import IPython
from ipymock.browser import start_conversation
def ask(prompt):
for response in start_conversation(prompt):
IPython.display.display(IPython.core.display.Markdown(response))
IPython.display.clear_output(wait=True)
ask('''
翻译成中文:
🔥1/8
Introducing "🤖 Task-driven Autonomous Agent"
An agent that leverages @openai 's GPT-4, @pinecone vector search, and @LangChainAI framework to autonomously create and perform tasks based on an objective.
🚀2/8 The system can complete tasks, generate new tasks based on results, and prioritize tasks in real-time. It demonstrates the potential of AI-powered language models to autonomously perform tasks within various constraints and contexts.
💡3/8 The autonomous agent uses GPT-4 for task completion, Pinecone for efficient search and storage of task-related data, and the LangChain framework to enhance decision-making processes. #GPT4 #Pinecone #LangChain
🎯4/8 The system maintains a task list for managing and prioritizing tasks. It autonomously creates new tasks based on completed results and reprioritizes the task list accordingly, showcasing the adaptability of AI-powered language models.
🔧5/8 To complete tasks, the system uses GPT-4 and LangChain's capabilities, enriching and storing results in Pinecone. This integrated approach allows the AI agent to interact with its environment and perform tasks efficiently.
🧠6/8 The system generates new tasks based on completed task results and prioritizes them using GPT-4. This allows the system to adapt and respond to new information and priorities.
🔮7/8 Future improvements include integrating a security/safety agent, task sequencing and parallel tasks, generating interim milestones, and incorporating real-time priority updates.
🤝8/8 This new approach paves the way for AI-powered language models to autonomously perform tasks within various constraints and contexts, enabling new applications and opportunities. Big thanks to all involved! #AIResearch #GPT4 #Pinecone #LangChain
''')
ask('''
pinecone-client 是什么?
''')
ask('''
Pinecone is a vector search platform that provides efficient search and storage capabilities for high-dimensional vector data. In our system, we use Pinecone to store and retrieve task-related data, such as task descriptions, constraints, and results.
翻译成中文。
''')
ask('''
Pinecone 是什么原理?
''')
ask('''
Pinecone 服务免费吗?
''')
ask('''
有什么替代 Pinecone 服务的开源方案?
''')
要将 Pinecone 换成 Weaviate,需要完成以下步骤:
安装 Weaviate 客户端库:可以使用
pip install weaviate-client
命令来安装。在 Weaviate 中创建索引:您需要在 Weaviate 中创建一个新索引来存储数据。可以使用以下代码创建一个新的索引:
import weaviate client = weaviate.Client("http://localhost:8080") index = client.index.create("YOUR_INDEX_NAME", ["YOUR_INDEX_CLASS"])
请将“YOUR_INDEX_NAME”和“YOUR_INDEX_CLASS”替换为您自己的索引名称和索引类。
将数据添加到 Weaviate 索引中:您需要使用 Weaviate 客户端库将数据添加到 Weaviate 索引中。以下是一个示例代码片段:
import weaviate client = weaviate.Client("http://localhost:8080") index = client.index("YOUR_INDEX_NAME") data = [{"name": "John", "age": 30}, {"name": "Jane", "age": 35}] index.batch.create(data)
请将“YOUR_INDEX_NAME”替换为您自己的索引名称,将“data”替换为您要添加到索引中的数据。
修改代码:最后,您需要修改代码,以便将 Pinecone API 调用替换为 Weaviate API 调用。具体而言,您需要更改以下代码:
# Create Pinecone index table_name = YOUR_TABLE_NAME dimension = 1536 metric = "cosine" pod_type = "p1" if table_name not in pinecone.list_indexes(): pinecone.create_index(table_name, dimension=dimension, metric=metric, pod_type=pod_type) # Connect to the index index = pinecone.Index(table_name)
用以下代码替换:
# Connect to the Weaviate index import weaviate client = weaviate.Client("http://localhost:8080") index = client.index(YOUR_INDEX_NAME)
请将“YOUR_INDEX_NAME”替换为您在步骤 2 中创建的索引名称。
请注意,Weaviate 与 Pinecone 不同,因此您需要相应地修改代码以适应 Weaviate API。此外,您需要确保在 Weaviate 中创建的索引具有正确的维数和指标。
ask('''
比较 Weaviate 和 Faiss
''')
For the application scenario of a task-driven autonomous agent robot, Weaviate may be more suitable. Because Weaviate has the ability to automatically classify data, it can help the robot discover and execute related tasks more quickly. Weaviate also supports search for multiple data types and can handle high-dimensional data, which may be useful for tasks such as natural language processing and semantic understanding. Of course, the specific choice depends on the application scenario and specific requirements.
ask('''
我累了。带着我祷告一下吧。
''')
中文 Native 版本
# initialize the transformer for embedding
from sentence_transformers import SentenceTransformer
model = SentenceTransformer('GanymedeNil/text2vec-large-chinese')
import faiss
import openai
import os
import sys
import time
from collections import deque
from typing import Dict, List
from dotenv import load_dotenv
# set variables
load_dotenv()
# set API keys
OPENAI_API_KEY = 'sk-blah'
assert OPENAI_API_KEY, 'OPENAI_API_KEY environment variable is missing'
OPENAI_API_MODEL = os.getenv('OPENAI_API_MODEL', 'gpt-3.5-turbo')
assert OPENAI_API_MODEL, 'OPENAI_API_MODEL environment variable is missing from .env'
# project config
OBJECTIVE = '防御太阳风暴。'
assert OBJECTIVE, 'OBJECTIVE environment variable is missing'
YOUR_FIRST_TASK = '制作一个待办事项清单。'
assert YOUR_FIRST_TASK, 'FIRST_TASK environment variable is missing'
# configure OpenAI API key
openai.api_key = OPENAI_API_KEY
# initialize the indexer as empty
embedding_size = 1024
index = faiss.IndexFlatL2(embedding_size)
# task list
todo_list = deque([])
done_list = deque([])
def add_task(task: Dict):
todo_list.append(task)
def index_embedding(text):
# text = text.replace('\n', ' ')
index.add(model.encode([text]))
def openai_call(prompt: str, model: str = OPENAI_API_MODEL, temperature: float = 0.5, max_tokens: int = 100):
if not model.startswith('gpt-'):
# use completion API
response = openai.Completion.create(
engine=model,
prompt=prompt,
temperature=temperature,
max_tokens=max_tokens,
top_p=1,
frequency_penalty=0,
presence_penalty=0
)
return response.choices[0].text.strip()
else:
# use chat completion API
messages=[{'role': 'user', 'content': prompt}]
response = openai.ChatCompletion.create(
model=model,
messages=messages,
temperature=temperature,
max_tokens=max_tokens,
n=1,
stop=None,
)
return response.choices[0].message.content.strip()
def task_creation_agent(objective: str, result: str, task_description: str, task_list: List[str]):
todos = '\n'.join(task_list)
prompt = f"你是一个任务创建的 AI,使用执行代理的结果来创建新的任务,目标是:『{objective}』,上一个已完成的任务的结果是:『\n{result}\n』。这个结果是基于以下任务描述的:『{task_description}』。以下是未完成的任务清单:『\n{todos}\n』。根据结果,创建新的任务供 AI 系统完成,不要与未完成的任务重叠。将任务以列表的形式返回。"
response = openai_call(prompt)
new_tasks = response.split('\n')
return [{'task_name': task_name} for task_name in new_tasks]
def prioritization_agent(this_task_id: int):
global todo_list
task_names = [t['task_name'] for t in todo_list]
next_task_id = int(this_task_id)+1
prompt = f'''你是一个任务优先级排列 AI,任务清单如下:{task_names}。
请考虑你的团队的最终目标:『{OBJECTIVE}』。
不要删除任何任务。
返回一个编号列表,例如:
#. 第一个任务
#. 第二个任务
请从数字 {next_task_id} 开始列出任务清单。'''
response = openai_call(prompt)
new_tasks = response.split('\n')
# reset todo task list
todo_list = deque()
for task_string in new_tasks:
task_parts = task_string.strip().split('.', 1)
if len(task_parts) == 2:
task_id = task_parts[0].strip()
task_name = task_parts[1].strip()
todo_list.append({'task_id': task_id, 'task_name': task_name})
def execution_agent(objective: str, task: str) -> str:
context = context_agent(query = objective, n = 5)
print('\n*******RELEVANT CONTEXT******\n')
print(context)
prompt =f'你是一个执行任务的 AI,根据以下最终目标执行一个任务:『{objective}』。\n考虑已经完成的任务:{context}。\n你的任务是:{task}。\n请回复执行结果。'
return openai_call(prompt, temperature = 0.7, max_tokens = 2000)
def context_agent(query: str, n: int):
_, idx = index.search(model.encode([query]), n)
task_names = [done_list[i]['task_name'] for i in idx[0] if i >= 0 and i < len(done_list)]
return task_names
def test_baby_agi_native(mock_openai):
if 'gpt-4' in OPENAI_API_MODEL.lower():
print(f'\033[91m\033[1m' + '\n*****USING GPT-4. POTENTIALLY EXPENSIVE. MONITOR YOUR COSTS*****' + '\033[0m\033[0m')
# print OBJECTIVE
print('\033[96m\033[1m' + '\n*****OBJECTIVE*****\n' + '\033[0m\033[0m')
print(OBJECTIVE)
# add the first task
first_task = {
'task_id': 1,
'task_name': YOUR_FIRST_TASK
}
add_task(first_task)
task_id_counter = 1
# main loop
while todo_list:
# print the task list
print('\033[95m\033[1m' + '\n*****TASK LIST*****\n' + '\033[0m\033[0m')
for todo in todo_list:
print(f"{todo['task_id']}: {todo['task_name']}")
# step 1: pull the first task
task = todo_list.popleft()
print('\033[92m\033[1m' + '\n*****NEXT TASK*****\n' + '\033[0m\033[0m')
print(f"{task['task_id']}: {task['task_name']}")
# step 2: complete the task based on the context and index result in faiss
result = execution_agent(OBJECTIVE, task['task_name'])
print('\033[93m\033[1m' + '\n*****TASK RESULT*****\n' + '\033[0m\033[0m')
print(result)
done_list.append({'task_name': task['task_name'], 'result': result})
index_embedding(result)
# step 3: create new tasks and reprioritize task list
new_tasks = task_creation_agent(OBJECTIVE, result, task['task_name'], [todo['task_name'] for todo in todo_list])
for new_task in new_tasks:
task_id_counter += 1
new_task.update({'task_id': task_id_counter})
add_task(new_task)
prioritization_agent(task['task_id'])
# sleep before checking the task list again
time.sleep(1)
from ipymock import do
from ipymock.browser import mock_openai
import ipymock.browser
ipymock.browser.common.conversation_id = ''
do(
mock_openai = mock_openai,
test_baby_agi_native = test_baby_agi_native,
)
- 把已完成任务的结果中与当前任务相似的信息作为 context 提供给 BabyAGI
- 让 BabyAGI 自主运用搜索工具来获取用户所预备的文本中与任务相似的信息
- 把用户所预备的文本中与当前任务相似的信息作为 context 提供给 BabyAGI
- 将「任务优先级排序」实现为「开发人员自主调用的工具」?对任务与最终目标的相似性进行排序?
- 预备 LangChain 和 BabyAGI 的源代码和说明文档,让 GPT-4/BabyAGI 提供改进意见并自我改进?
ChatGPT / LLM 对于长提示的系列分片的内容连贯性具有怎样的分辨力?
对于长提示的整体性理解必须通过精调实现吗?
确认 MRKL 的效能? 意愿、预见、行动、观察、问题、假说?
确认 LangChain 的提示效能。 LangChain 本身会不会是一种低效的提示方式?
模型是否具有确定信息可信度的能力?模型是否具有理解和遵循关于协议的指令的能力?模型表现出的自主选择的本质是什么?其数学本质是什么?
ChatGPT 对于英文指令的训练和中文指令的训练是否有差异?
RLHF 是通过精调还是通过提示完成的?
from collections import deque
from typing import Dict, List, Optional, Any
from langchain import LLMChain, OpenAI, PromptTemplate
from langchain.llms import BaseLLM
from langchain.vectorstores.base import VectorStore
from pydantic import BaseModel, Field
from langchain.chains.base import Chain
from langchain.embeddings import HuggingFaceEmbeddings
# define your embedding model
embeddings_model = HuggingFaceEmbeddings(model_name='GanymedeNil/text2vec-large-chinese')
import faiss
embedding_size = 1024
# initialize the vectorstore as empty
index = faiss.IndexFlatL2(embedding_size)
from langchain.vectorstores import FAISS
from langchain.docstore import InMemoryDocstore
vectorstore = FAISS(embeddings_model.embed_query, index, InMemoryDocstore({}), {})
class TaskCreationChain(LLMChain):
'''Chain to generates tasks.'''
@classmethod
def from_llm(cls, llm: BaseLLM, verbose: bool = True) -> LLMChain:
'''Get the response parser.'''
task_creation_template = (
'你是一个创建任务的 AI,根据任务的执行结果创建新任务。\n'
'* 我们的最终目标是「{objective}」\n'
'* 上一次完成的任务是「{task_description}」\n'
' 该任务的执行结果为:\n'
'```\n'
'{result}\n'
'```\n'
'* 这些是未完成的任务:\n'
'```\n'
'{incomplete_tasks}\n'
'```\n\n'
'请根据上一次完成的任务的结果创建将由 AI 系统完成的新任务。\n'
'* 请不要创建与未完成的任务重叠的新任务。\n'
'* 请以带编号的清单形式回复结果,每行只描述一个新任务。\n'
' 例如:\n'
' #. 第一个新任务\n'
' #. 第二个新任务\n'
'* 请从编号 1 开始列出新任务清单。'
)
prompt = PromptTemplate(
template=task_creation_template,
input_variables=[
'objective',
'task_description',
'result',
'incomplete_tasks',
],
)
return cls(prompt=prompt, llm=llm, verbose=verbose)
class TaskPrioritizationChain(LLMChain):
'''Chain to prioritize tasks.'''
@classmethod
def from_llm(cls, llm: BaseLLM, verbose: bool = True) -> LLMChain:
'''Get the response parser.'''
task_prioritization_template = (
'你是一个给任务优先级进行排序的 AI。\n'
'* 我们的最终目标是「{objective}」\n\n'
'请整理并重新排序以下任务:{task_names}。\n'
'* 请不要删除任何现有任务。\n'
'* 请以带编号的清单形式回复结果,每行只描述一个任务。\n'
' 例如:\n'
' #. 第一个任务\n'
' #. 第二个任务\n'
'* 请从编号 {next_task_id} 开始列出任务清单。'
)
prompt = PromptTemplate(
template=task_prioritization_template,
input_variables=[
'objective',
'task_names',
'next_task_id',
],
)
return cls(prompt=prompt, llm=llm, verbose=verbose)
class ExecutionChain(LLMChain):
'''Chain to execute tasks.'''
@classmethod
def from_llm(cls, llm: BaseLLM, verbose: bool = True) -> LLMChain:
'''Get the response parser.'''
execution_template = (
'你是一个执行任务的 AI。\n'
'* 我们的最终目标是「{objective}」\n'
'* 之前已经完成的任务有:{context}\n\n'
'请完成这次任务:「{task}」。\n'
'* 直接回复这次任务的执行结果。'
)
prompt = PromptTemplate(
template = execution_template,
input_variables = [
'objective',
'context',
'task',
],
)
return cls(prompt = prompt, llm = llm, verbose = verbose)
def get_next_task(
task_creation_chain: LLMChain,
result: Dict,
task_description: str,
task_list: List[str],
objective: str,
) -> List[Dict]:
'''Get the next task.'''
incomplete_tasks = '\n'.join(task_list)
response = task_creation_chain.run(
objective = objective,
task_description = task_description,
result = result.replace('```', ''),
incomplete_tasks = incomplete_tasks.replace('```', ''),
)
new_tasks = response.split('\n')
new_tasks = [task_name.split('.', 1) for task_name in new_tasks]
new_tasks = [task_name[1].strip() for task_name in new_tasks if len(task_name)==2]
return [{'task_name': task_name} for task_name in new_tasks if task_name.strip()]
def prioritize_tasks(
task_prioritization_chain: LLMChain,
this_task_id: int,
task_list: List[Dict],
objective: str,
) -> List[Dict]:
'''Prioritize tasks.'''
task_names = [t['task_name'] for t in task_list]
next_task_id = int(this_task_id) + 1
response = task_prioritization_chain.run(
objective = objective, task_names = task_names, next_task_id = next_task_id,
)
new_tasks = response.split('\n')
prioritized_task_list = []
for task_string in new_tasks:
if not task_string.strip():
continue
task_parts = task_string.strip().split('.', 1)
if len(task_parts) == 2:
# task_id = task_parts[0].strip()
task_name = task_parts[1].strip()
prioritized_task_list.append({'task_id': next_task_id, 'task_name': task_name})
next_task_id += 1
return prioritized_task_list
def _get_top_tasks(vectorstore, query: str, k: int) -> List[str]:
'''Get the top k tasks based on the query.'''
results = vectorstore.similarity_search_with_score(query, k = k)
if not results:
return []
sorted_results, _ = zip(*sorted(results, key = lambda x: x[1], reverse = True))
return [str(item.metadata['task']) for item in sorted_results]
def execute_task(
vectorstore, execution_chain: LLMChain, objective: str, task: str, k: int = 5
) -> str:
'''Execute a task.'''
context = _get_top_tasks(vectorstore, query = objective, k = k)
return execution_chain.run(objective = objective, context = context, task = task)
class BabyAGI(Chain, BaseModel):
'''Controller model for the BabyAGI agent.'''
task_list: deque = Field(default_factory=deque)
task_creation_chain: TaskCreationChain = Field(...)
task_prioritization_chain: TaskPrioritizationChain = Field(...)
execution_chain: Chain = Field(...)
task_id_counter: int = Field(1)
vectorstore: VectorStore = Field(init=False)
max_iterations: Optional[int] = None
class Config:
'''Configuration for this pydantic object.'''
arbitrary_types_allowed = True
def add_task(self, task: Dict):
self.task_list.append(task)
def print_task_list(self):
print('\033[95m\033[1m' + '\n*****TASK LIST*****\n' + '\033[0m\033[0m')
for t in self.task_list:
print(str(t['task_id']) + ': ' + t['task_name'])
def print_next_task(self, task: Dict):
print('\033[92m\033[1m' + '\n*****NEXT TASK*****\n' + '\033[0m\033[0m')
print(str(task['task_id']) + ': ' + task['task_name'])
def print_task_result(self, result: str):
print('\033[93m\033[1m' + '\n*****TASK RESULT*****\n' + '\033[0m\033[0m')
print(result)
@property
def input_keys(self) -> List[str]:
return ['objective']
@property
def output_keys(self) -> List[str]:
return []
def _call(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
'''Run the agent.'''
objective = inputs['objective']
first_task = inputs.get('first_task', f'制作一个最终目标「{objective}」的待办事项清单。')
self.add_task({'task_id': 1, 'task_name': first_task})
num_iters = 0
while self.task_list:
self.print_task_list()
# Step 1: Pull the first task
task = self.task_list.popleft()
self.print_next_task(task)
# Step 2: Execute the task
result = execute_task(
self.vectorstore, self.execution_chain, objective, task['task_name']
)
this_task_id = int(task['task_id'])
self.print_task_result(result)
# Step 3: Store the result in Faiss
result_id = f'result_{task["task_id"]}'
self.vectorstore.add_texts(
texts = [result],
metadatas = [{'task': task['task_name']}],
ids = [result_id],
)
# Step 4: Create new tasks and reprioritize task list
new_tasks = get_next_task(
self.task_creation_chain,
result,
task['task_name'],
[t['task_name'] for t in self.task_list],
objective,
)
for new_task in new_tasks:
self.task_id_counter += 1
new_task.update({'task_id': self.task_id_counter})
self.add_task(new_task)
self.task_list = deque(
prioritize_tasks(
self.task_prioritization_chain,
this_task_id,
list(self.task_list),
objective,
)
)
num_iters += 1
if self.max_iterations is not None and num_iters == self.max_iterations:
print(
'\033[91m\033[1m' + '\n*****TASK ENDING*****\n' + '\033[0m\033[0m'
)
break
if self.task_list == deque():
self.task_id_counter += 1
self.add_task({'task_id': self.task_id_counter, 'task_name': first_task})
return {}
@classmethod
def from_llm(
cls, llm: BaseLLM, vectorstore: VectorStore,
task_execution_chain: Optional[Chain] = None,
verbose: bool = False, **kwargs
) -> 'BabyAGI':
'''Initialize the BabyAGI Controller.'''
task_creation_chain = TaskCreationChain.from_llm(llm, verbose=verbose)
task_prioritization_chain = TaskPrioritizationChain.from_llm(
llm, verbose=verbose
)
if task_execution_chain is None:
execution_chain = ExecutionChain.from_llm(llm, verbose=verbose)
else:
execution_chain = task_execution_chain
return cls(
task_creation_chain = task_creation_chain,
task_prioritization_chain = task_prioritization_chain,
execution_chain = execution_chain,
vectorstore = vectorstore,
**kwargs,
)
With Tools
- 让 BabyAGI 自主利用 LangChain 工具/GPT-4 插件来执行任务?
- 确认 ChatGPT / LLM 能够明白与开发人员达成的调用工具的协议
- AutoGPT 的调用协议比 LangChain 更加有效
%%bash
pip install duckduckgo-search
%%writefile /usr/local/anaconda3/envs/biobot/lib/python3.10/site-packages/duckduckgo_search/ddg.py
import logging
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
from time import sleep
from urllib.parse import unquote
from click import progressbar
from .utils import SESSION, _do_output, _download_file, _get_vqd, _normalize
logger = logging.getLogger(__name__)
def ddg(
keywords,
region="wt-wt",
safesearch="moderate",
time=None,
max_results=None,
page=1,
output=None,
download=False,
):
"""DuckDuckGo text search. Query params: https://duckduckgo.com/params
Args:
keywords (str): keywords for query.
region (str, optional): wt-wt, us-en, uk-en, ru-ru, etc. Defaults to "wt-wt".
safesearch (str, optional): on, moderate, off. Defaults to "moderate".
time (Optional[str], optional): d, w, m, y. Defaults to None.
max_results (Optional[int], optional): maximum number of results, max=200. Defaults to None.
if max_results is set, then the parameter page is not taken into account.
page (int, optional): page for pagination. Defaults to 1.
output (Optional[str], optional): csv, json. Defaults to None.
download (bool, optional): if True, download and save dociments to 'keywords' folder.
Defaults to False.
Returns:
Optional[List[dict]]: DuckDuckGo text search results.
"""
def get_ddg_page(page):
payload["s"] = max(PAGINATION_STEP * (page - 1), 0)
page_data = None
try:
resp = SESSION.get("https://links.duckduckgo.com/d.js", params=payload)
resp.raise_for_status()
page_data = resp.json().get("results", None)
except Exception:
logger.exception("")
if not max_results:
return None
page_results = []
if page_data:
for row in page_data:
if "n" not in row and row["u"] not in cache:
cache.add(row["u"])
body = _normalize(row["a"])
if body:
page_results.append(
{
"title": _normalize(row["t"]),
"href": row["u"],
"body": body,
}
)
return page_results
if not keywords:
return None
# get vqd
vqd = _get_vqd(keywords)
if not vqd:
return None
PAGINATION_STEP, MAX_API_RESULTS = 25, 200
# prepare payload
safesearch_base = {"On": 1, "Moderate": -1, "Off": -2}
payload = {
"q": keywords,
"l": region,
"p": safesearch_base[safesearch.capitalize()],
"s": 0,
"df": time,
"o": "json",
"vqd": vqd,
}
# get results
cache = set()
if max_results:
results, page = [], 1
max_results = min(abs(max_results), MAX_API_RESULTS)
iterations = (max_results - 1) // PAGINATION_STEP + 1 # == math.ceil()
with ThreadPoolExecutor(min(iterations, 4)) as executor:
fs = []
for page in range(1, iterations + 1):
fs.append(executor.submit(get_ddg_page, page))
sleep(min(iterations / 17, 0.3)) # sleep to prevent blocking
for r in as_completed(fs):
if r.result():
results.extend(r.result())
results = results[:max_results]
else:
results = get_ddg_page(page=page)
if not results:
return None
keywords = keywords.replace(" filetype:", "_")
# save to csv or json file
if output:
_do_output("ddg", keywords, output, results)
# download documents
if download:
keywords = (
keywords.replace('"', "'")
.replace("site:", "")
.replace(" ", "_")
.replace("/", "_")
)
path = f"ddg_{keywords}_{datetime.now():%Y%m%d_%H%M%S}"
os.makedirs(path, exist_ok=True)
futures = []
with ThreadPoolExecutor(10) as executor:
for i, res in enumerate(results, start=1):
filename = unquote(res["href"].split("/")[-1].split("?")[0])
future = executor.submit(
_download_file, res["href"], path, f"{i}_{filename}"
)
futures.append(future)
with progressbar(
as_completed(futures),
label="Downloading documents",
length=len(futures),
show_percent=True,
show_pos=True,
width=0,
) as as_completed_futures:
for i, future in enumerate(as_completed_futures, start=1):
logger.info("%s/%s", i, len(results))
return results
""" using html method
payload = {
'q': keywords,
'l': region,
'p': safesearch_base[safesearch],
'df': time
}
results = []
while True:
res = SESSION.post('https://html.duckduckgo.com/html', data=payload, **kwargs)
tree = html.fromstring(res.text)
if tree.xpath('//div[@class="no-results"]/text()'):
return results
for element in tree.xpath('//div[contains(@class, "results_links")]'):
results.append({
'title': element.xpath('.//a[contains(@class, "result__a")]/text()')[0],
'href': element.xpath('.//a[contains(@class, "result__a")]/@href')[0],
'body': ''.join(element.xpath('.//a[contains(@class, "result__snippet")]//text()')),
})
if len(results) >= max_results:
return results
next_page = tree.xpath('.//div[@class="nav-link"]')[-1]
names = next_page.xpath('.//input[@type="hidden"]/@name')
values = next_page.xpath('.//input[@type="hidden"]/@value')
payload = {n: v for n, v in zip(names, values)}
sleep(2)
"""
from typing import Dict, List, Optional
from pydantic import BaseModel, Extra
from pydantic.class_validators import root_validator
class DuckDuckGoSearchAPIWrapper(BaseModel):
"""Wrapper for DuckDuckGo Search API.
Free and does not require any setup
"""
k: int = 10
region: Optional[str] = "wt-wt"
safesearch: str = "moderate"
time: Optional[str] = "y"
max_results: int = 5
class Config:
"""Configuration for this pydantic object."""
extra = Extra.forbid
@root_validator()
def validate_environment(cls, values: Dict) -> Dict:
"""Validate that python package exists in environment."""
try:
from duckduckgo_search import ddg # noqa: F401
except ImportError:
raise ValueError(
"Could not import duckduckgo-search python package. "
"Please install it with `pip install duckduckgo-search`."
)
return values
def run(self, query: str) -> str:
from duckduckgo_search import ddg
"""Run query through DuckDuckGo and return results."""
results = ddg(
query,
region=self.region,
safesearch=self.safesearch,
time=self.time,
max_results=self.max_results,
)
if results is None or len(results) == 0:
return f'No good {{DuckDuckGo Search Result: {results}}} was found for query: {query}'
snippets = '\n'.join([result['body'] for result in results])
return f'「\n{snippets}\n」 was found for query: {query}'
def results(self, query: str, num_results: int) -> List[Dict]:
"""Run query through DuckDuckGo and return metadata.
Args:
query: The query to search for.
num_results: The number of results to return.
Returns:
A list of dictionaries with the following keys:
snippet - The description of the result.
title - The title of the result.
link - The link to the result.
"""
from duckduckgo_search import ddg
results = ddg(
query,
region=self.region,
safesearch=self.safesearch,
time=self.time,
max_results=num_results,
)
if results is None or len(results) == 0:
return [{"Result": f'No good {{DuckDuckGo Search Result: {results}}} was found for query: {query}'}]
def to_metadata(result: Dict) -> Dict:
return {
"snippet": result["body"],
"title": result["title"],
"link": result["href"],
}
return [to_metadata(result) for result in results]
from langchain import OpenAI, LLMChain
# from langchain.utilities.duckduckgo_search import DuckDuckGoSearchAPIWrapper
from langchain.agents import Tool
tools = [
Tool(
name = '搜索',
func = DuckDuckGoSearchAPIWrapper().run,
description = '适用于当你需要搜索你所不知道的最新信息来回答相关问题的时候。',
return_direct = True,
),
Tool(
name = '待办事项',
func = LLMChain(
llm = OpenAI(temperature = 0),
prompt = PromptTemplate.from_template(
'你是一个计划师,擅长为特定目标制定待办清单。\n为以下目标制定一个待办清单:「\n{objective}\n」。'
)
).run,
description = '适用于需要创建待办事项清单的情况。输入:需要创建待办事项清单的最终目标。输出:该目标的待办事项清单。请明确指定目标!',
return_direct = True,
),
]
%%bash
grep -ri 'Agent stopped due to iteration limit or time limit.' /usr/local/anaconda3/envs/biobot/lib/python3.10/site-packages/langchain
%%bash
grep -ri 'Prompt after formatting:' /usr/local/anaconda3/envs/biobot/lib/python3.10/site-packages/langchain
from langchain import OpenAI
llm = OpenAI(temperature = 0, verbose = True)
from langchain.agents import AgentExecutor, ZeroShotAgent
class ZeroRoundAgent(ZeroShotAgent):
@property
def observation_prefix(self) -> str:
'''Prefix to append the observation with.'''
return '行动输出:'
@property
def llm_prefix(self) -> str:
'''Prefix to append the llm call with.'''
return '思考:'
FORMAT_INSTRUCTIONS = '''使用以下格式回复:「
目标:你必须完成的目标
思考:你应该始终思考该怎么做
行动:需要采取的行动,应为 [{tool_names}] 中的一个
行动输入:行动的输入
行动输出:行动的输出
……(这里,思考/行动/行动输入/行动输出,可以重复 N 次)
思考:我现在得到了最终结果
最终结果:原始目标的最终结果
」'''
llm_chain = LLMChain(
llm = llm,
prompt = ZeroRoundAgent.create_prompt(
tools,
prefix = '''你是一个执行任务的 AI。\n我们的最终目标是「{objective}」。\n已完成的任务有:{context}。''',
suffix = '''请完成这次任务:「{task}」\n\n完成这次任务的步骤如下:「\n{agent_scratchpad}''',
format_instructions = FORMAT_INSTRUCTIONS,
input_variables = ['objective', 'task', 'context', 'agent_scratchpad'],
)
)
import re
from typing import Union
from langchain.agents.agent import AgentOutputParser
from langchain.schema import AgentAction, AgentFinish, OutputParserException
FINAL_ANSWER_ACTION = r'最终结果\s*[:|:]\s*(.*)'
class ChineseOutputParser(AgentOutputParser):
default_action: Optional[str] = None
default_action_input: Optional[str] = None
def get_format_instructions(self) -> str:
return FORMAT_INSTRUCTIONS
def parse(self, text: str) -> Union[AgentAction, AgentFinish]:
match = re.search(FINAL_ANSWER_ACTION, text, re.DOTALL)
if match:
return AgentFinish(
{'output': match.group(1).strip()}, text
)
# \s matches against tab/newline/whitespace
regex = r'\s*行动\s*\d*\s*[:|:]\s*([^\s]+)\s*行动输入\s*\d*\s*[:|:]\s*([^\s]+)\s*'
matches = re.findall(regex, text)
if not matches:
if self.default_action is not None and self.default_action_input is not None:
return AgentAction(self.default_action, self.default_action_input, text)
else:
return AgentFinish(
{'output': text}, text
)
raise OutputParserException(f'Could not parse LLM output: `{text}`')
action = matches[-1][0].strip()
action_input = matches[-1][1].strip(' ').strip('"')
return AgentAction(action, action_input, text)
agent_executor = AgentExecutor.from_agent_and_tools(
agent = ZeroRoundAgent(
llm_chain = llm_chain, allowed_tools = [tool.name for tool in tools], output_parser = ChineseOutputParser()
),
tools = tools, verbose = True, max_iterations = None
)
Running the BabyAGI
# verbose: logging of LLMChains
# max_iterations: Optional[int]
# if None, agi will keep on going forever
baby_agi = BabyAGI.from_llm(
llm = llm, vectorstore = vectorstore, task_execution_chain = agent_executor, verbose = True, max_iterations = None
)
OBJECTIVE = '防御太阳风暴。'
def test_baby_agi(mock_openai):
baby_agi({'objective': OBJECTIVE})
from ipymock import do
from ipymock.browser import mock_openai
import ipymock.browser
ipymock.browser.chat_gpt_base_url = 'http://127.0.0.1:8080'
ipymock.browser.common.conversation_id = ''
do(
mock_openai = mock_openai,
test_baby_agi = test_baby_agi,
)
Running the BabyAGI without Tools
# verbose: logging of LLMChains
# max_iterations: Optional[int]
# if None, agi will keep on going forever
baby_agi = BabyAGI.from_llm(
llm = llm, vectorstore = vectorstore, task_execution_chain = None, verbose = False, max_iterations = None
)
do(
mock_openai = mock_openai,
test_baby_agi = test_baby_agi,
)
Human intelligence has the remarkable ability to assemble basic skills into complex ones so as to solve complex tasks. This ability is equally important for Artificial Intelligence (AI), and thus, we assert that in addition to the development of large, comprehensive intelligent models, it is equally crucial to equip such models with the capability to harness various domain-specific expert models for complex task-solving in the pursuit of Artificial General Intelligence (AGI). Recent developments in Large Language Models (LLMs) have demonstrated remarkable learning and reasoning abilities, making them promising as a controller to select, synthesize, and execute external models to solve complex tasks. In this project, we develop OpenAGI, an open-source AGI research platform, specifically designed to offer complex, multi-step tasks and accompanied by task-specific datasets, evaluation metrics, and a diverse range of extensible models. OpenAGI formulates complex tasks as natural language queries, serving as input to the LLM. The LLM subsequently selects, synthesizes, and executes models provided by OpenAGI to address the task. Furthermore, we propose a Reinforcement Learning from Task Feedback (RLTF) mechanism, which uses the task-solving result as feedback to improve the LLM’s task-solving ability. Thus, the LLM is responsible for synthesizing various external models for solving complex tasks, while RLTF provides feedback to improve its task-solving ability, enabling a feedback loop for self-improving AI. We believe that the paradigm of LLMs operating various expert models for complex task-solving is a promising approach towards AGI. To facilitate the community’s long-term improvement and evaluation of AGI’s ability, we open-source the code, benchmark, and evaluation methods of the OpenAGI project at GitHub.
人类智慧有着将基本技能组合成复杂技能以解决复杂任务的卓越能力。这种能力对于人工智能(AI)同样重要,因此我们断言,在开发大型、全面的智能模型的同时,赋予这些模型利用各种领域专家模型解决复杂任务的能力同样至关重要,以追求人工通用智能(AGI)。近年来,大型语言模型(LLM)在学习和推理能力方面展现出了卓越的表现,使它们成为选择、综合和执行外部模型以解决复杂任务的控制器。在这个项目中,我们开发了 OpenAGI,一个开源的 AGI 研究平台,专门设计用于提供复杂的多步骤任务,并伴随着任务特定的数据集、评估指标和各种可扩展的模型。OpenAGI 将复杂任务构成为自然语言查询,作为 LLM 的输入。LLM 随后选择、综合和执行由 OpenAGI 提供的模型来解决任务。此外,我们提出了一种从任务反馈中进行强化学习的机制(RLTF),它使用任务解决结果作为反馈来提高 LLM 的任务解决能力。因此,LLM 负责综合各种外部模型来解决复杂任务,而 RLTF 提供反馈以提高其任务解决能力,实现了自我改进的 AI 的反馈循环。我们认为,LLM 运行各种专家模型来解决复杂任务的范式是一种有前途的 AGI 方法。为了促进社区对 AGI 能力的长期改进和评估,我们在 GitHub 开源了 OpenAGI 项目的代码、基准和评估方法。
%%bash
cd ~ && git clone https://github.com/Torantulino/Auto-GPT
%%bash
cd ~/Auto-GPT && git pull
%%bash
cd ~/Auto-GPT && pip install -r requirements.txt
%%bash
pip uninstall --yes pytest asynctest pytest-asyncio pytest-benchmark pytest-cov pytest-integration pytest-mock vcrpy pytest-vcr
pip install pytest==6.'*'
%%bash
cd ~/Auto-GPT && python scripts/check_requirements.py requirements.txt
%%bash
pip install pyobjc
Running Auto GPT
%%bash
mkdir -p ~/Auto-GPT/andrew_space
import os, sys
os.chdir(os.path.expanduser('~/Auto-GPT'))
sys.path.append(os.path.expanduser('~/Auto-GPT'))
import pytest
@pytest.fixture
def reset_embed_dimension(monkeypatch):
import autogpt.memory.local
monkeypatch.setattr(autogpt.memory.local, 'EMBED_DIM', 1024)
def test_auto_gpt(
mock_openai,
mock_openai_embed,
reset_embed_dimension,
):
from autogpt.main import run_auto_gpt
run_auto_gpt(
continuous = True,
continuous_limit = 10000,
ai_settings = None,
skip_reprompt = True,
speak = True,
debug = False,
gpt3only = False,
gpt4only = True,
memory_type = 'local',
browser_name = 'chrome',
allow_downloads = True,
skip_news = True,
workspace_directory = os.path.expanduser('~/Auto-GPT/andrew_space'),
install_plugin_deps = True,
)
assert True
import ipymock
import ipymock.browser
import ipymock.llm
ipymock.browser.init()
ipymock.browser.init(['--headless'])
ipymock.do(
mock_openai = ipymock.browser.mock_openai,
mock_openai_embed = ipymock.llm.mock_openai_embed,
reset_embed_dimension = reset_embed_dimension,
test_auto_gpt = test_auto_gpt,
)
ipymock.browser.open_chat(ipymock.browser.common.conversation_id)
ipymock.browser.get_screenshot()
import chardet, codecs, os
def convert_file_encoding(file_path):
# Read the file content
with open(file_path, 'rb') as f:
content = f.read()
# Detect the encoding type of the file content
with codecs.open(file_path, encoding=chardet.detect(content)['encoding']) as f:
file_content = f.read()
with codecs.open(file_path, 'w', encoding='UTF-8') as f:
f.write(file_content)
convert_file_encoding(os.path.expanduser('~/auto-gpt/andrew_space/life_extension_papers.txt'))