我有下面的pyspark代码来计算文件夹中每个文件的SHA1哈希。我使用spark.sparkContext.binaryFiles来获取成对的RDD,其中键是文件名,值是一个类似文件的对象,在该对象上,我正在计算映射函数rdd.mapValues(map_hash_file)中的散列。然而,我得到了下面的错误在第二-最后一行,但我不明白-这是如何修复请?谢谢
错误:org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 66.0 failed 4 times, most recent failure: Lost task 0.3 in stage 66.0
代码:
#Function to calulcate hash-value/checksum of a file
def map_hash_file(row):
file_name = row[0]
file_contents = row[1]
sha1_hash = hashlib.sha1()
sha1_hash.update(file_contents.encode('utf-8'))
return file_name, sha1_hash.hexdigest()
rdd = spark.sparkContext.binaryFiles('/mnt/workspace/Test_Folder', minPartitions=None)
#As a check, print the list of files collected in the RDD
dataColl=rdd.collect()
for row in dataColl:
print(row[0])
#Apply the function to calcuate hash of each file and store the results
hash_values = rdd.mapValues(map_hash_file)
#Store each file name and it's hash value in a dataframe to later export as a CSV
df = spark.createDataFrame(data=hash_values)
display(df)发布于 2022-01-21 14:40:44
如果您执行以下操作,您将得到预期的结果:
file_contents.encode('utf-8')改为file_contents。bytesrdd.mapValues(map_hash_file)到rdd.map(map_hash_file)类型的file_contents已经是一个。函数map_hash_file需要一个元组.还应考虑:
向驱动程序添加一个收集所有文件内容的import hashlib
。
通过上述更改,您的代码应该如下所示:
import hashlib
#Function to calulcate hash-value/checksum of a file
def map_hash_file(row):
file_name = row[0]
file_contents = row[1]
sha1_hash = hashlib.sha1()
sha1_hash.update(file_contents)
return file_name, sha1_hash.hexdigest()
rdd = spark.sparkContext.binaryFiles('/mnt/workspace/Test_Folder', minPartitions=None)
#Apply the function to calcuate hash of each file and store the results
hash_values = rdd.map(map_hash_file)
#Store each file name and it's hash value in a dataframe to later export as a CSV
df = spark.createDataFrame(data=hash_values)
display(df)https://stackoverflow.com/questions/70800984
复制相似问题