最专业的营销网站建设价格,电商门户网站建设方案,wordpress更新定位插件,tp5做企业类网站我们已经看到了整个RAG流程#xff0c;并获得了第一手的实践经验#xff0c;您可能会对RAG流程中一些组件的使用和目的存在很多疑惑#xff0c;比如RunnablePassthrough。在本节中#xff0c;我们将进一步了解这些关键组件。
RAG的核心模型思想是将一个复杂的任务分解为多…我们已经看到了整个RAG流程并获得了第一手的实践经验您可能会对RAG流程中一些组件的使用和目的存在很多疑惑比如RunnablePassthrough。在本节中我们将进一步了解这些关键组件。
RAG的核心模型思想是将一个复杂的任务分解为多个小而简单的任务组合分而治之的策略在处理大型和复杂问题时非常有用。让我们先来看一个非常简单的RAG流水线 from langchain_core.runnables import Runnable, RunnableSequence# 定义第一个任务输出 hello
class TaskA(Runnable):def invoke(self, input, contextNone):return input a# 定义第二个任务输出 beautiful
class TaskBeautiful(Runnable):def invoke(self, input, contextNone):return input beautifulclass TaskDay(Runnable):def invoke(self, input, context None):return input day!
# 创建任务实例
task_a TaskA()
task_beautiful TaskBeautiful()
task_day TaskDay()# 使用|操作符重载Runnable
rag_chain task_a | task_beautiful | task_day
output rag_chain.invoke(what)
print(output)上面代码的输出是what a beautiful day。可以看到Runnable类重载了|操作符我们可以将Runnable的实例组合在一起并且最后一个Runnable实例的输出成为下一个Runnable实例的输入。同时我们需要确保每个Runnable实例提供invoke接口以便数据在连接的Runnable实例之间传递。
回到RAG流程中使用的组件RunnablePassThrough实际上类似于以下代码 class DoNothing(Runnable):def invoke(self, input, contextNone):return input让我们看一个例子 passthrough RunnablePassthrough()
input_data {msg: this is a test}
output_data passthrough.invoke(input_data)
print(output_data)运行上面的代码将输出 {‘msg’: ‘this is a test’}
RunnablePassthrough通常在管道中用作占位符或连接器以连接两个功能管道但它实际上可以对其输入数据进行一些简单的更改 # 原始数据
input_data {message: Hello, World!, status: initial}passthrough RunnablePassthrough.assign(result lambda x: processed)# 使用输入数据调用
output_data passthrough.invoke(input_data)print(output_data)这里我们需要确保输入数据是字典形式且assign的参数为lambda表达式运行上述代码得到以下结果 {message: Hello, World!, status: initial, result: processed}Runnable对象可以像下面这样将几个函数链接在一起 def task1(input1):return ftask1:{input1}def task2(input1):return ftask2:{input1}def context(input):return fcontext: {input}rag_chain RunnablePassthrough().assign(contextcontext) | task1 | task2
res rag_chain.invoke({invoke: input for invoke})
print(res)invoke调用将数据发送到RunnablePassthrough对象数据未更改地输出然后数据将发送到task1task1的输出将作为task2的输入。运行上述代码的结果为 task2:task1:{invoke: input for invoke, context: context: {invoke: input for invoke}}另一个需要了解的组件是RunnableMap它允许我们将多个Runnable对象组合在一起可以并行或顺序运行。RunnableMap对象就像一个字典字典中的键就像分支或步骤可以在相同输入上独立运行。当我们需要多个步骤或函数独立处理相同输入时这非常有用。让我们来看一个示例 from langchain_core.runnables import Runnable, RunnableMap# 定义第一个任务输出 task1
class Task1(Runnable):def invoke(self, input, contextNone):return ftask1 for {input}class Task2(Runnable):def invoke(self, input, contextNone):return ftask2 for {input}
pipeline RunnableMap({task1: Task1(),task2: Task2(),
})input_data input data
output_data pipeline.invoke(input_data)
print(output_data)传递给RunnableMap的映射中每个键的值都应实现invoke接口pipeline.invoke调用将并行触发映射中每个值的invoke调用。上述代码的输出为 {task1: task1 for input data, task2: task2 for input data}最后一个我们需要了解的组件是RunnableParallel它类似于RunnableMap它们之间可能有一些细微差别但在此不重要。我们可以将RunnableParallel视为RunnableMap值得注意的一点是RunnableParallel的assign。让我们来看代码示例 from langchain_core.runnables import Runnable, RunnableParallel, RunnableMap
import time
# 定义第一个任务输出 a
class TaskA(Runnable):def invoke(self, input, contextNone):time.sleep(1)return input a# 定义第二个任务输出 beautiful
class TaskBeautiful(Runnable):def invoke(self, input, contextNone):time.sleep(2)return input beautifulclass TaskDay(Runnable):def invoke(self, input, context None):time.sleep(3)return input day
# 创建任务实例class TaskAssign(Runnable):def invoke(self, input, context None):time.sleep(3)print(finput: {input})return {**input}start time.time()
parallel_tasks RunnableParallel({taskA: TaskA(),taskBeautiful: TaskBeautiful(),TaskDay: TaskDay(),
}).assign(taskAssignTaskAssign())
output parallel_tasks.invoke(what)
print(foutput for rag run parallelly: {output})
end time.time()
print(ftime for rag run parallel: {end - start} )上面代码的输出为 input: {taskA: what a, taskBeautiful: what beautiful, TaskDay: what day}
output for rag run parallelly: {taskA: what a, taskBeautiful: what beautiful, TaskDay: what day, taskAssign: {taskA: what a, taskBeautiful: what beautiful, TaskDay: what day}}
time for rag run parallel: 6.011621713638306从输出可以看出在RunnableParallel中使用assign时它会首先执行传递给RunnableParallel的映射中的任务然后使用结果字典作为TaskAssign的参数最终输出为RunnableParallel的字典和一个键为TaskAssign的条目值为TaskAssign的返回结果。
最后让我们重新构建上一节的RAG流水线如下所示 def task_from_docs(input):print(ftask from docs: {input})print(finput[context]: {input[context]})return lambda x: format_docs(input[context])rag_chain_from_docs (RunnablePassthrough.assign(context(task_from_docs))
)rag_chain_with_source RunnableParallel({context: retriver,question: RunnablePassthrough(),
}).assign(answerrag_chain_from_docs)rag_chain_with_source的invoke将并行触发retriver、RunnablePassthrough的invoke
然后将得到的字典结果发送到rag_chain_from_docs流水线再将该字典发送到task_from_docs函数并将
输出作为字典中的context键的值res rag_chain_with_source.invoke(How does RAG compare with fine-tuning)
print(fcontext of res {res[context]})
print(fquestion of res {res[question]})#res将用作调用rag_chain_from_docs的参数并将其传递给task_from_docs调用
#task_from_docs将提取字典中context的值并传递给format_docsrag_chain rag_chain_with_source | prompt | llm | StrOutputParser()
response rag_chain.invoke(How does RAG compare with fine-tuning)
print(ffinal rag response: {response})上面代码运行后所得结果如下
Retrieval-Augmented Generation (RAG) and fine-tuning are two different approaches to enhancing the capabilities of Large Language Models (LLMs). RAG combines the strengths of LLMs with internal data, allowing organizations to access and utilize their own data effectively. It enhances the accuracy and relevance of responses by fetching specific information from databases in real time, thus expanding the models knowledge beyond its initial training data. RAG is particularly useful for organizations that need to leverage their proprietary data or the latest information that was not included in the models training set.On the other hand, fine-tuning involves adjusting the weights and biases of a pre-trained model based on new training data. This process permanently alters the models behavior and is suitable for teaching the model specialized tasks or adapting it to specific domains. However, fine-tuning can be complex and costly, especially as data sources evolve, and it may lead to issues like overfitting.In summary, RAG is more about integrating real-time data retrieval to enhance model responses, while fine-tuning is about permanently modifying the model based on new data. Each approach has its advantages and limitations, and the choice between them depends on the specific needs and resources of the organization.
基于上述知识我们可以轻松理解代码。rag_chain_with_source的invoke将生成一个带有context和question键的字典context的值为retriver.invoke(How does RAG compare with fine-tuning)的结果question的值为RunnablePassthrough().invoke(How does RAG compare with fine-tuning)的结果即原始字符串本身。然后带有context和question的字典作为task_from_docs的输入。在task_from_docs中取出context键的值传递给format_docstask_from_docs的输出将发送给prompt函数。prompt的输出将生成一个带有问题和上下文的提示提示发送到ChatGPT以获得最终返回结果。