原文towardsdatascience.com/optimizing-the-data-processing-performance-in-pyspark-4b895857c8aa?sourcecollection_archive---------3-----------------------#2024-11-07PySpark 技术与策略解决常见的性能挑战一个实用的操作指南https://medium.com/johnleungTJ?sourcepost_page---byline--4b895857c8aa--------------------------------https://towardsdatascience.com/?sourcepost_page---byline--4b895857c8aa-------------------------------- John Leung·发表于Towards Data Science ·阅读时间9 分钟·发布日期2024 年 11 月 7 日–Apache Spark由于其强大的分布式数据处理能力近年来已成为领先的数据分析引擎之一。PySpark 是 Spark 的 Python API常用于个人和企业项目中以解决数据挑战。例如我们可以使用 PySpark 高效地实现时间序列数据的特征工程包括数据摄取、提取和可视化。然而尽管 PySpark 能够处理大规模数据集但在一些特定场景下如极端数据分布和复杂的数据转换流程性能瓶颈仍然可能出现。本文将探讨在Databricks上使用 PySpark 进行数据处理时常见的性能问题并介绍各种优化策略以实现更快的执行速度。https://github.com/OpenDocCN/towardsdatascience-blog-zh-2024/raw/master/docs/img/7a51dfd6e0bb834e68f2dbd4ac63cace.png图片来源Veri Ivanova 来自Unsplash假设你开设了一家在线零售店提供多种产品主要面向美国客户。你计划通过分析当前交易的购买习惯来满足现有客户的更多需求并吸引更多新客户。这促使你投入大量精力处理交易记录作为准备步骤。#0 模拟数据我们首先模拟了 100 万条交易记录在实际的大数据场景中预计会处理更大的数据集这些记录包含了客户 ID、购买的产品和交易细节如支付方式和总金额。值得一提的是客户 ID #100 的产品代理商有着庞大的客户群因此在你的店铺中占据了大部分代发货的购买。以下是演示此场景的代码importcsvimportdatetimeimportnumpyasnpimportrandom# Remove existing ‘retail_transactions.csv’ file, if any! rm-f/p/a/t/h retail_transactions.csv# Set the no of transactions and othet configsno_of_iterations1000000data[]csvFileretail_transactions.csv# Open a file in write modewithopen(csvFile,w,newline)asf:fieldnames[orderID,customerID,productID,state,paymentMthd,totalAmt,invoiceTime]writercsv.DictWriter(f,fieldnamesfieldnames)writer.writeheader()fornuminrange(no_of_iterations):# Create a transaction record with random valuesnew_txn{orderID:num,customerID:random.choice([100,random.randint(1,100000)]),productID:np.random.randint(10000,sizerandom.randint(1,5)).tolist(),state:random.choice([CA,TX,FL,NY,PA,OTHERS]),paymentMthd:random.choice([Credit card,Debit card,Digital wallet,Cash on delivery,Cryptocurrency]),totalAmt:round(random.random()*5000,2),invoiceTime:datetime.datetime.now().isoformat()}data.append(new_txn)writer.writerows(data)在模拟数据之后我们使用 Databrick 的 Jupyter Notebook 将 CSV 文件加载到 PySpark DataFrame 中。# Set file location and typefile_location/FileStore/tables/retail_transactions.csvfile_typecsv# Define CSV optionsschemaorderID INTEGER, customerID INTEGER, productID INTEGER, state STRING, paymentMthd STRING, totalAmt DOUBLE, invoiceTime TIMESTAMPfirst_row_is_headertruedelimiter,# Read CSV files into DataFramedfspark.read.format(file_type)\.schema(schema)\.option(header,first_row_is_header)\.option(delimiter,delimiter)\.load(file_location)我们还创建了一个可重用的装饰器工具用于衡量和比较每个函数内不同方法的执行时间。importtime# Measure the excution time of a given functiondeftime_decorator(func):defwrapper(*args,**kwargs):begin_timetime.time()outputfunc(*args,**kwargs)end_timetime.time()print(fExecution time of function{func.__name__}:{round(end_time-begin_time,2)}seconds.)returnoutputreturnwrapper好的所有准备工作已经完成。接下来我们将探讨以下几个章节中执行性能的不同潜在挑战。#1 存储Spark 使用弹性分布式数据集RDD作为其核心构建块数据默认通常保存在内存中。无论是执行计算如连接和聚合还是在集群中存储数据所有操作都会在统一区域中贡献内存使用。https://github.com/OpenDocCN/towardsdatascience-blog-zh-2024/raw/master/docs/img/f7f20c42fa3519fee671c569416280dc.png一个包含执行内存和存储内存的统一区域图源作者如果设计不当可能导致可用内存不足。这会导致过多的分区溢出到磁盘从而导致性能下降。缓存和持久化中间结果或频繁访问的数据集是常见的做法。虽然缓存和持久化具有相同的目的但它们的存储级别可能有所不同。应当合理利用资源以确保高效的读写操作。例如如果转换后的数据会在不同的后续阶段中重复用于计算和算法建议对这些数据进行缓存。代码示例假设我们想要调查使用数字钱包作为支付方式的不同交易记录子集。低效 — 没有缓存frompyspark.sql.functionsimportcoltime_decoratordefwithout_cache(data):# 1st filteringdf2data.where(col(paymentMthd)Digital wallet)countdf2.count()# 2nd filteringdf3df2.where(col(totalAmt)2000)countdf3.count()returncount display(without_cache(df))高效 — 缓存关键数据集frompyspark.sql.functionsimportcoltime_decoratordefafter_cache(data):# 1st filtering with cachedf2data.where(col(paymentMthd)Digital wallet).cache()countdf2.count()# 2nd filteringdf3df2.where(col(totalAmt)2000)countdf3.count()returncount display(after_cache(df))缓存之后即使我们想要根据不同的交易金额阈值或其他数据维度来过滤转换后的数据集执行时间也会更易于控制。#2 洗牌当我们执行如连接 DataFrame 或按数据字段分组的操作时会发生洗牌。这是必要的目的是将所有记录重新分布到集群中并确保具有相同键的记录位于同一个节点。这有助于同时处理并合并结果。https://github.com/OpenDocCN/towardsdatascience-blog-zh-2024/raw/master/docs/img/ab84ce7db5a3682b102879ebcf4c8646.png洗牌连接图源作者然而这种洗牌操作是代价高昂的——由于数据在节点间的移动执行时间长且额外的网络开销。为了减少洗牌操作有几种策略(1) 对于小数据集使用广播变量将只读副本发送到每个工作节点进行本地处理虽然“较小”数据集通常定义为每个执行器最大内存阈值为 8GB但广播的理想大小应通过针对特定案例的实验来确定。https://github.com/OpenDocCN/towardsdatascience-blog-zh-2024/raw/master/docs/img/23bfddffdcd8abbc41eace540e8e045e.png广播连接作者图片(2) 提前过滤尽早尽可能减少处理的数据量(3) 控制分区数量以确保最佳性能代码示例假设我们想返回与我们的州列表匹配的交易记录及其全名低效——大数据集与小数据集之间的 shuffle 连接frompyspark.sql.functionsimportcoltime_decoratordefno_broadcast_var(data):# Create small dataframesmall_data[(CA,California),(TX,Texas),(FL,Florida)]small_dfspark.createDataFrame(small_data,[state,stateLF])# Perform joiningresult_no_broadcastdata.join(small_df,state)returnresult_no_broadcast.count()display(no_broadcast_var(df))高效——使用广播变量将大数据集与小数据集合并frompyspark.sql.functionsimportcol,broadcasttime_decoratordefhave_broadcast_var(data):small_data[(CA,California),(TX,Texas),(FL,Florida)]small_dfspark.createDataFrame(small_data,[state,stateFullName])# Create broadcast variable and perform joiningresult_have_broadcastdata.join(broadcast(small_df),state)returnresult_have_broadcast.count()display(have_broadcast_var(df))#3 倾斜性数据有时会分布不均尤其是用于处理的键字段。这会导致分区大小不平衡其中某些分区比平均值大或小得多。由于执行性能受到最长运行任务的限制因此需要解决过载节点的问题。一种常见的方法是加盐。其原理是通过向倾斜键添加随机数使得数据在分区中更加均匀分布。假设在基于倾斜键进行聚合时我们将使用加盐后的键进行聚合然后再使用原始键进行聚合。另一种方法是重新分区它通过增加分区的数量来帮助数据更均匀地分布。https://github.com/OpenDocCN/towardsdatascience-blog-zh-2024/raw/master/docs/img/119db606882ab6d12a63942cc007751e.png数据分布——加盐前后的情况作者图片代码示例我们想聚合一个不对称的数据集主要由客户 ID #100 引起的倾斜。低效——直接使用倾斜键frompyspark.sql.functionsimportcol,desctime_decoratordefno_salting(data):# Perform aggregationagg_datadata.groupBy(customerID).agg({totalAmt:sum}).sort(desc(sum(totalAmt)))returnagg_data display(no_salting(df))高效——使用加盐的倾斜键进行聚合frompyspark.sql.functionsimportcol,lit,concat,rand,split,desctime_decoratordefhave_salting(data):# Salt the customerID by adding the suffixsalted_datadata.withColumn(salt,(rand()*8).cast(int))\.withColumn(saltedCustomerID,concat(col(customerID),lit(_),col(salt)))# Perform aggregationagg_datasalted_data.groupBy(saltedCustomerID).agg({totalAmt:sum})# Remove salt for further aggregationfinal_resultagg_data.withColumn(customerID,split(col(saltedCustomerID),_)[0]).groupBy(customerID).agg({sum(totalAmt):sum}).sort(desc(sum(sum(totalAmt))))returnfinal_result display(have_salting(df))向倾斜键添加一个随机的前缀或后缀都可以有效。通常5 到 10 个随机值是一个很好的起点可以在扩展数据和保持高复杂性之间取得平衡。#4 序列化人们通常更倾向于使用用户定义函数UDFs因为它在定制数据处理逻辑方面更灵活。然而UDFs 是按行逐一操作的。代码需要被 Python 解释器序列化发送到执行器 JVM然后再反序列化。这会产生高昂的序列化开销且阻碍 Spark 对代码的优化和高效处理。简单直接的方法是尽可能避免使用 UDFs。我们应首先考虑使用内置 Spark 函数这些函数可以处理聚合、数组/映射操作、日期/时间戳以及 JSON 数据处理等任务。如果内置函数无法满足你的需求确实可以考虑使用pandasUDFs。与 UDFs 相比它们建立在 Apache Arrow 基础上具有更低的开销和更高的性能。代码示例交易价格根据来源州进行折扣。低效 — 使用 UDFfrompyspark.sql.functionsimportudffrompyspark.sql.typesimportDoubleTypefrompyspark.sqlimportfunctionsasFimportnumpyasnp# UDF to calculate discounted amountdefcalculate_discount(state,amount):ifstateCA:returnamount*0.90# 10% offelse:returnamount*0.85# 15% offdiscount_udfudf(calculate_discount,DoubleType())time_decoratordefhave_udf(data):# Use the UDFdiscounted_datadata.withColumn(discountedTotalAmt,discount_udf(state,totalAmt))# Show the resultsreturndiscounted_data.select(customerID,totalAmt,state,discountedTotalAmt).show()display(have_udf(df))高效 — 使用内置的 PySpark 函数frompyspark.sql.functionsimportwhentime_decoratordefno_udf(data):# Use when and otherwise to discount the amount based on conditionsdiscounted_datadata.withColumn(discountedTotalAmt,when(data.stateCA,data.totalAmt*0.90)# 10% off.otherwise(data.totalAmt*0.85))# 15% off# Show the resultsreturndiscounted_data.select(customerID,totalAmt,state,discountedTotalAmt).show()display(no_udf(df))在这个示例中我们使用内置的 PySpark 函数“when”和“otherwise”来有效地按顺序检查多个条件。基于我们对这些函数的熟悉示例几乎是无限的。例如pyspark.sql.functions.transform一个帮助对输入数组中的每个元素应用转换的函数自 PySpark 3.1.0 版本开始引入。#5 溢出如在存储部分讨论的那样溢出是由于内存不足以容纳所有所需数据导致将临时数据从内存写入磁盘。我们提到的许多性能问题都与溢出有关。例如在分区之间洗牌大量数据的操作容易导致内存耗尽并随之发生溢出。https://github.com/OpenDocCN/towardsdatascience-blog-zh-2024/raw/master/docs/img/8e0391d49e71ebd4918378b38ed64fe2.png由于内存不足引起的溢出不同场景图像由作者提供审查 Spark UI 中的性能指标至关重要。如果我们发现溢出内存和溢出磁盘的统计数据那么溢出可能是长时间运行任务的原因。为了解决这个问题可以尝试实例化一个每个工作节点有更多内存的集群例如通过调节配置值spark.executor.memory来增加执行进程的内存大小另外我们还可以配置spark.memory.fraction来调整分配给执行和存储的内存量。总结我们遇到了一些常见的导致 PySpark 性能下降的因素以及可能的改进方法存储使用缓存和持久化存储常用的中间结果Shuffle为小数据集使用广播变量以促进 Spark 的本地处理偏斜执行加盐或重新分区以更均匀地分布偏斜数据序列化更倾向于使用内置 Spark 函数以优化性能溢出调整配置值以明智地分配内存最近自适应查询执行AQE被提出用于基于运行时统计信息对查询进行动态规划和重新规划。这支持查询执行过程中发生的不同查询重新优化特性从而成为一种出色的优化技术。然而在初期设计阶段理解数据特征仍然至关重要因为这有助于制定更好的策略以编写有效的代码和查询并利用 AQE 进行微调。在你离开之前如果你喜欢这篇文章欢迎关注我的Medium 页面和LinkedIn 页面。通过这样做你可以及时获取有关数据科学副项目、机器学习运维MLOps示范以及项目管理方法学的精彩内容。## 简化数据工程项目中的 Python 代码用于数据摄取、验证、处理和测试的 Python 技巧与技术实用的操作流程towardsdatascience.com ## 使用 PySpark 在 Databricks 上进行时间序列特征工程探索 PySpark 在时间序列数据中的潜力摄取、提取和可视化数据并附带实践…towardsdatascience.com