dify实现分析-rag-文件上传后的处理
前面的文章:已经说明了文档上传的总体步骤。当上传一个或多个文档后,dify会启动索引的构建任务来处理文档内容,并构建索引。本文介绍文档索引构建的实现逻辑。
dify实现分析-rag-文件上传后的处理
概述
前面的文章:已经说明了文档上传的总体步骤。当上传一个或多个文档后,dify会启动索引的构建任务来处理文档内容,并构建索引。本文介绍文档索引构建的实现逻辑。
document_indexing_task函数
新建文档的索引构建在函数document_indexing_task
中实现,该函数的总体逻辑如下图:
上传文档的处理在document_indexing_task函数中进行。该函数主要是记录文档的状态,并启动索引的构建任务。
- 任务定义
使用 Celery 的 @shared_task 装饰器定义异步任务,指定任务队列为 “dataset”,并以数据集ID和文档ID列表作为参数。代码如下:
@shared_task(queue="dataset")
def document_indexing_task(dataset_id: str, document_ids: list):
- 数据集验证
查询并验证数据集是否存在,如果数据集不存在,记录日志并返回。
dataset = db.session.query(Dataset).filter(Dataset.id == dataset_id).first()
if not dataset:
logging.info(click.style("Dataset is not found: {}".format(dataset_id), fg="yellow"))
return
- 租户限额检查
检查租户的功能限制:验证批量上传限制和向量空间限制,若超出限制,将所有文档标记为错误状态。
features = FeatureService.get_features(dataset.tenant_id)
try:
if features.billing.enabled:
# 检查批量上传限制
# 检查向量空间限制
except Exception as e:
# 处理错误:更新所有相关文档的状态为错误
for document_id in document_ids:
document = db.session.query(Document).filter(...).first()
if document:
document.indexing_status = "error"
document.error = str(e)
document.stopped_at = datetime.datetime.now(...)
db.session.add(document)
db.session.commit()
return
- 文档预处理
遍历所有文档ID:查询每个文档,更新文档状态为 “parsing”,和记录处理开始时间。
for document_id in document_ids:
logging.info(click.style("Start process document: {}".format(document_id), fg="green"))
document = db.session.query(Document).filter(...).first()
if document:
# 更新文档状态为解析中
document.indexing_status = "parsing"
document.processing_started_at = datetime.datetime.now(...)
documents.append(document)
db.session.add(document)
db.session.commit()
- 针对document构建索引
创建 IndexingRunner 实例,运行文档索引处理,并记录处理耗时和处理可能的暂停错误和其他异常。
try:
indexing_runner = IndexingRunner()
indexing_runner.run(documents)
end_at = time.perf_counter()
logging.info(click.style("Processed dataset: {} latency: {}".format(
dataset_id, end_at - start_at), fg="green"))
except DocumentIsPausedError as ex:
logging.info(click.style(str(ex), fg="yellow"))
except Exception:
pass
索引构建:IndexingRunner.run
该函数的总体实现逻辑如下图所示:
IndexingRunner
实现了文档索引构建的整个流程,包括从数据库中获取数据、处理规则、提取文本、转换数据为文档对象、保存段落以及生成嵌入向量,保存到向量库中。
其中run函数的功能和实现步骤如下:
(1)遍历传入的 dataset_documents
列表,对每个文档执行索引操作。
(2)从数据库中获取相应(通过数据集id来查询)的数据集(Dataset),若不存在,停止处理抛出错误。
(3)获取当前数据集和文档的处理规则(Processing Rule)
(4)根据文档类型(doc_form),选择合适的索引处理器(IndexProcessor)。dify提供了4中索引构建器,可以在IndexType类中查看这几种索引构建器的名称。
(5)使用self._extract函数根据文档不同格式获取文档中的段落数据,并保存到Document对象中
(6)使用 self._transform
方法将提取的文本数据转换为内部使用的 Document
对象:把第(5)步获取到的文档数据进行进一步切分,切分成一个个的chunk,每个chunk是一个Document对象。
(7)将提取和转换后的文档分段(chunk)信息保存到DocumentSegment表中。
(8)使用 self._load
根据文档分段来构建索引,或文档向量。
总结
通过以上分析已经知道了整个索引构建的全部流程。索引构建每个步骤的详细实现逻辑在后面的文章中继续分析。
更多推荐
所有评论(0)