目前,项目维护着S3存储桶,该存储桶包含一个1.5 GB的大型zip文件,其中包含.xpt和.sas7dbat文件。解压缩的文件大小为20 GB。
尝试解压缩文件并将相同的文件夹结构推送到S3
以下代码适用于小型zip文件,但不适用于大型Zip文件(1.5GB):
for obj in bucket.objects.all():
#file_name = os.path.abspath(obj.key) # get full path of files
key = urlparse(obj.key.encode('utf8'))
obj = client.get_object(Bucket='my-zip-bucket', Key=obj.key)
with io.BytesIO(obj["Body"].read()) as tf:
# rewind the file
tf.seek(0)
with zipfile.ZipFile(tf, mode='r') as zipf:
for file in zipf.infolist():
fileName = file.filename
putFile = client.put_object(Bucket='my-un-zip-bucket-', Key=fileName, Body=zipf.read(file))
putObjects.append(putFile)
错误:内存大小:3008 MB使用的最大内存:3008 MB
我想验证一下:
有使用AWS Glue的无服务器解决方案!(弄清楚了,我差点死了)
此解决方案分为两个部分:
请参阅下面的代码,该文件将ZIP文件解压缩并将内容放回同一存储桶(可配置)中。
如果有帮助,请投票:)
Lambda脚本(python3)调用名为YourGlueJob的Glue作业
import boto3
import urllib.parse
glue = boto3.client('glue')
def lambda_handler(event, context):
bucket = event['Records'][0]['s3']['bucket']['name']
key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8')
print(key)
try:
newJobRun = glue.start_job_run(
JobName = 'YourGlueJob',
Arguments = {
'--bucket':bucket,
'--key':key,
}
)
print("Successfully created unzip job")
return key
except Exception as e:
print(e)
print('Error starting unzip job for' + key)
raise e
AWS Glue作业脚本以解压缩文件
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME','bucket','key'],)
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
import boto3
import zipfile
import io
from contextlib import closing
s3 = boto3.client('s3')
s3r = boto3.resource('s3')
bucket = args["bucket"]
key = args["key"]
obj = s3r.Object(
bucket_name=bucket,
key=key
)
buffer = io.BytesIO(obj.get()["Body"].read())
z = zipfile.ZipFile(buffer)
list = z.namelist()
for filerr in list:
print(filerr)
y=z.open(filerr)
arcname = key + filerr
x = io.BytesIO(y.read())
s3.upload_fileobj(x, bucket, arcname)
y.close()
print(list)
job.commit()
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句