dify实现分析-rag-文件上传后的处理

概述

前面的文章:已经说明了文档上传的总体步骤。当上传一个或多个文档后,dify会启动索引的构建任务来处理文档内容,并构建索引。本文介绍文档索引构建的实现逻辑。

document_indexing_task函数

新建文档的索引构建在函数document_indexing_task中实现,该函数的总体逻辑如下图:

接收任务参数: 数据集和文档id列表
验证数据集是否存在
检查当前租户的额度是否超出限制
更新文档状态为: parsing
启动索引处理任务
记录索引处理时间和处理结果

上传文档的处理在document_indexing_task函数中进行。该函数主要是记录文档的状态,并启动索引的构建任务。

  1. 任务定义

使用 Celery 的 @shared_task 装饰器定义异步任务,指定任务队列为 “dataset”,并以数据集ID和文档ID列表作为参数。代码如下:

@shared_task(queue="dataset")
def document_indexing_task(dataset_id: str, document_ids: list):
  1. 数据集验证

查询并验证数据集是否存在,如果数据集不存在,记录日志并返回。

dataset = db.session.query(Dataset).filter(Dataset.id == dataset_id).first()
if not dataset:
    logging.info(click.style("Dataset is not found: {}".format(dataset_id), fg="yellow"))
    return
  1. 租户限额检查

检查租户的功能限制:验证批量上传限制和向量空间限制,若超出限制,将所有文档标记为错误状态。

features = FeatureService.get_features(dataset.tenant_id)
try:
    if features.billing.enabled:
        # 检查批量上传限制
        # 检查向量空间限制
except Exception as e:
    # 处理错误:更新所有相关文档的状态为错误
    for document_id in document_ids:
        document = db.session.query(Document).filter(...).first()
        if document:
            document.indexing_status = "error"
            document.error = str(e)
            document.stopped_at = datetime.datetime.now(...)
            db.session.add(document)
    db.session.commit()
    return
  1. 文档预处理

遍历所有文档ID:查询每个文档,更新文档状态为 “parsing”,和记录处理开始时间。

for document_id in document_ids:
    logging.info(click.style("Start process document: {}".format(document_id), fg="green"))
    document = db.session.query(Document).filter(...).first()
    
    if document:
        # 更新文档状态为解析中
        document.indexing_status = "parsing"
        document.processing_started_at = datetime.datetime.now(...)
        documents.append(document)
        db.session.add(document)
db.session.commit()
  1. 针对document构建索引

创建 IndexingRunner 实例,运行文档索引处理,并记录处理耗时和处理可能的暂停错误和其他异常。

try:
    indexing_runner = IndexingRunner()
    indexing_runner.run(documents)
    end_at = time.perf_counter()
    logging.info(click.style("Processed dataset: {} latency: {}".format(
        dataset_id, end_at - start_at), fg="green"))
except DocumentIsPausedError as ex:
    logging.info(click.style(str(ex), fg="yellow"))
except Exception:
    pass

索引构建:IndexingRunner.run

该函数的总体实现逻辑如下图所示:

开始批量处理
遍历文档列表
文档处理循环
从数据库中获取数据集信息,若数据集不存在抛出异常
获取处理规则:用户设置的一些参数
根据文档类型来创建对应的索引处理器
Extract: 提取文档内容:这里会按段落或整页来获取文档内容
Transform: 文档分块转换:把上一步读取的内容进一步切分成一个个的分块chunk
_load_segments: 保存文档分段: 把chunk信息保存到DocumentSegment表中
构建索引: 创建关键词索引
高质量索引?
多线程嵌入向量
基础索引构建
更新文档状态
还有下一个文档?
结束处理

IndexingRunner 实现了文档索引构建的整个流程,包括从数据库中获取数据、处理规则、提取文本、转换数据为文档对象、保存段落以及生成嵌入向量,保存到向量库中。

其中run函数的功能和实现步骤如下:

(1)遍历传入的 dataset_documents 列表,对每个文档执行索引操作。

(2)从数据库中获取相应(通过数据集id来查询)的数据集(Dataset),若不存在,停止处理抛出错误。

(3)获取当前数据集和文档的处理规则(Processing Rule)

(4)根据文档类型(doc_form),选择合适的索引处理器(IndexProcessor)。dify提供了4中索引构建器,可以在IndexType类中查看这几种索引构建器的名称。

(5)使用self._extract函数根据文档不同格式获取文档中的段落数据,并保存到Document对象中

(6)使用 self._transform 方法将提取的文本数据转换为内部使用的 Document 对象:把第(5)步获取到的文档数据进行进一步切分,切分成一个个的chunk,每个chunk是一个Document对象。

(7)将提取和转换后的文档分段(chunk)信息保存到DocumentSegment表中。

(8)使用 self._load 根据文档分段来构建索引,或文档向量。

总结

通过以上分析已经知道了整个索引构建的全部流程。索引构建每个步骤的详细实现逻辑在后面的文章中继续分析。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐