产品展示

  • Home
  • 构建无服务器的数据质量管道使用 Deequ 在 AWS Lambda 大数据博客

构建无服务器的数据质量管道使用 Deequ 在 AWS Lambda 大数据博客

使用 Deequ 在 AWS Lambda 上构建无伺服器数据质量管道

重点摘要本文展示了如何在 AWS Lambda 上运行 Deequ,以实现数据质量检查和改进数据质量的数据管道。通过使用 AWS Step Functions 和 PyDeequ,我们能够高效地检查 Airbnb 住宿数据的质量,并根据检查结果进行后续操作。

加速器免费安装

数据质量差可能引发多种问题,例如管道失败、不正确的报告和错误的商业决策。例如,如果从某个系统获取的数据中有大量重复项,则将导致报告系统中的数据失真。为了防止这些问题出现,数据质量检查被整合到数据管道中,以评估数据的准确性和可靠性。如果数据质量标准未能达到,这些检查将发送警报,使数据工程师和数据管理员能够采取适当的行动。这些检查的范例包括计算记录数量、检测重复数据和检查空值。

为解决这些问题,亚马逊开发了一个名为 Deequ 的开源框架,能够在大规模下执行数据质量检查。2023年,AWS推出了 AWS Glue Data Quality,提供了测量和监控数据质量的完整解决方案。AWS Glue 利用 Deequ 的功能来运行数据质量检查,识别不良记录,提供数据质量分数,并通过机器学习来检测异常。然而,对于小型数据集,您可能需要更快的启动时间。在这种情况下,解决方案是运行 Deequ 在 AWS Lambda 上。

在本文中,我们将展示如何在 Lambda 上运行 Deequ。我们以一个示例应用程序为参考,演示如何使用 AWS Step Functions 构建数据管道,检查和改善数据质量。该管道使用 PyDeequ 作为 Deequ 的 Python API,并建立在 Apache Spark 之上,以执行数据质量检查。我们会展示如何使用 PyDeequ 库实现数据质量检查,并部署示例,使之能在 Lambda 中运行,同时讨论在 Lambda 中运行 PyDeequ 的考虑事项。

为了帮助您快速入门,我们设立了一个 GitHub 仓库,提供一个示例应用程序,供您练习运行和部署该应用程序。

您可能还对以下内容感兴趣:

从 AWS Glue Data Catalog 开始使用 AWS Glue Data Quality开始使用 AWS Glue Data Quality 的动态规则进行 ETL 管道使用 Deequ 和 AWS Glue 构建无伺服器数据质量和分析框架使用 PyDeequ 和 AWS Glue 监控数据湖中的数据质量以大规模测试数据质量的 Deequ

解决方案概述

在这个用例中,数据管道检查 Airbnb 住宿数据的质量,其中包括评级、评论和按地区划分的价格。您的目标是对输入文件执行数据质量检查。如果检查通过,则将按区域聚合价格和评论;如果检查失败,则将管道标记为失败,并向用户发送通知。该管道由 Step Functions 构建,包含三个主要步骤:

步骤描述数据质量检查此步骤使用 Lambda 函数来验证数据的准确性和可靠性,并将检查结果存储到 Amazon S3 存储桶中。数据聚合如果数据质量检查通过,则进行数据聚合计算,并将结果存储到 Amazon S3 中。通知在数据质量检查或数据聚合完成后,使用 Amazon SNS 向用户发送通知。

以下图示说明了解决方案架构。

实现质量检查

以下是示例住宿的 CSV 数据:

idnamehostnameneighbourhoodgroupneighbourhoodroomtypepriceminimumnightsnumberofreviews7071BrightRoom with sunny greenview!BrightPankowHelmholtzplatzPrivate room42219728268Cozy Berlin Friedrichshain for1/6 pElenaFriedrichshainKreuzbergFrankfurter Allee Sued FKEntire home/apt9053042742Spacious 35m2 in Central ApartmentDesireeFriedrichshainKreuzbergsuedliche LuisenstadtPrivate room3612557792Bungalow mit Garten in Berlin ZehlendorfJoSteglitz ZehlendorfOstpreuendammEntire home/apt492381081Beautiful Prenzlauer Berg AptBerndKatja )PankowPrenzlauer Berg NordEntire home/apt663238114763In the heart of Berlin!JuliaTempelhof SchoenebergSchoenebergSuedEntire home/apt130353153015Central Artist Appartement Prenzlauer BergMarcPankowHelmholtzplatzPrivate room523127

在 CSV 等半结构化数据格式中,没有内建的数据验证和完整性检查。您需要根据准确性、完整性、一致性、唯一性、时效性和有效性来验证数据,这些通常称为六个数据质量维度。例如,如果您希望在仪表板上显示特定房产的房东姓名,但该 CSV 文件中缺少房东姓名,则会出现数据不完整的问题。完整性检查包括查找缺失的记录、缺失的属性或数据的截断等。

作为 GitHub 仓库示例应用程序的一部分,我们提供了一个 PyDeequ 脚本,该脚本将对输入文件进行质量验证检查。

以下代码是确认完整性的检查示例:

pythoncheckCompleteness = VerificationSuite(spark)onData(dataset) isComplete(hostname)

检查数据唯一性的示例代码如下:

pythoncheckCompleteness = VerificationSuite(spark)onData(dataset) isUnique(id)

您还可以链接多个验证检查,示例如下:

pythoncheckResult = VerificationSuite(spark) onData(dataset) isComplete(name) isUnique(id) isComplete(hostname) isComplete(neighbourhood) isComplete(price) isNonNegative(price) run()

以下代码演示了如何确保文件中 99 以上的记录包含 hostname:

pythoncheckCompleteness = VerificationSuite(spark)onData(dataset) hasCompleteness(hostname lambda x x gt= 099)

前提条件

在开始之前,请确保您已完成以下前提条件:

您应拥有 AWS 帐户。安装并配置 AWS 命令行界面。安装 AWS SAM CLI。安装 Docker 社区版。您应该有 Python 3。

在 Lambda 上运行 Deequ

要部署示例应用程序,请完成以下步骤:

克隆 GitHub 仓库。使用提供的 AWS CloudFormation 模板 创建 Amazon Elastic Container Registry (Amazon ECR) 映像,用以在 Lambda 上运行 Deequ。使用 AWS SAM CLI 构建并将其余数据管道部署到您的 AWS 帐户。

有关详细的部署步骤,请参阅 GitHub 仓库 的 Readmemd。

当您部署示例应用程序时,您将发现 DataQuality 函数采用容器打包格式。这是因为此函数所需的 SoAL 库大于 250 MB 的 ZIP 存档打包限制。在 AWS 无伺服器应用程序模型 (AWS SAM) 部署过程中,还会创建 Step Functions 工作流及所需的运行管道的数据。

运行工作流

应用程序成功部署到您的 AWS 帐户后,请完成以下步骤运行工作流:

转到创建的 S3 存储桶。

您将注意到一个新的存储桶,其前缀为您的堆叠名称。

按照 GitHub 仓库 中的说明将 Spark 脚本上传到此 S3 存储桶。此脚本用于执行数据质量检查。订阅所创建的 SNS 主题,以接收成功或失败的电子邮件通知,如 GitHub 仓库 中所述。打开 Step Functions 控制台,运行名为 DataQualityUsingLambdaStateMachine 的工作流,使用默认输入。您可以测试成功和失败的情境,详细说明请参见 GitHub 仓库。

以下图示显示了 Step Functions 状态机的工作流程。

检查质量检查结果和指标

要检查质量检查结果,您可以导航至同一 S3 存储桶。导航到 OUTPUT/verificationresults 资料夹以查看质量检查验证结果。打开以前缀部分开头的文件。以下表格是该文件的快照。

checkchecklevelcheckstatusconstraintconstraintstatusAccomodationsErrorSuccessSizeConstraint(Size(None))SuccessAccomodationsErrorSuccessCompletenessConstraint(Completeness(nameNone))SuccessAccomodationsErrorSuccessUniquenessConstraint(Uniqueness(List(id)None))SuccessAccomodationsErrorSuccessCompletenessConstraint(Completeness(hostnameNone))SuccessAccomodationsErrorSuccessCompletenessConstraint(Completeness(neighbourhoodNone))SuccessAccomodationsErrorSuccessCompletenessConstraint(Completeness(priceNone))Success

Checkstatus 提示质量检查是否成功或失败。Constraint 列提示 Deequ 引擎所执行的不同质量检查。Constraintstatus 显示每个约束的成功或失败。

还可以查看 Deequ 生成的质量检查指标,导航至 OUTPUT/verificationresultsmetrics 资料夹。打开以前缀部分开头的文件。以下表格是该文件的快照。

构建无服务器的数据质量管道使用 Deequ 在 AWS Lambda 大数据博客entityinstancenamevalueColumnprice is nonnegativeCompliance1ColumnneighbourhoodCompleteness1ColumnpriceCompleteness1ColumnidUniqueness1ColumnhostnameCompleteness0998831356ColumnnameCompleteness0997348076

对于值为 1 的列,输入文件的所有记录均满足特定约束。对于值为 099 的列,99 的记录满足特定约束。

在 Lambda 中运行 PyDeequ 的考虑事项

在部署此解决方案时,请考虑以下因素:

在 Lambda 上运行 SoAL 是一种 单节点部署,并不限于单核心;一个节点可以在 Lambda 中拥有多个核心,这允许分布式数据处理。增加 Lambda 中的内存将成比例增加 CPU,从而提高可用的整体计算能力。多核心的单节点部署以及 Lambda 快速的启动时间使 Spark 作业的处理速度更快。此外,单个节点内部的核心整合可以加快洗牌操作、增强核心之间的通信并改善 I/O 性能。对于运行超过 15 分钟的 Spark 作业,或处理大型文件 (超过 1 GB) 或需要更多内存和计算资源的复杂联接,我们建议使用 AWS Glue Data Quality。SoAL 也可以在 Amazon ECS 中部署。为 Lambda 函数选择正确的内存设置有助于平衡速度和成本。您可以使用 Lambda 服务调整 自动化选择不同内存分配和测量所需时间的过程。使用多线程和多进程的工作负载可以从由 AWS Graviton 处理器支持的 Lambda 函数中受益,因为其提供了更好的性价比。您可以使用 Lambda 服务调谐在 x86 和 ARM 架构中运行并比较结果,以选择最适合您工作负载的架构。

发表评论