博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
函数计算实现 oss 上传超大 zip 压缩文件的自动解压处理 ...
阅读量:6829 次
发布时间:2019-06-26

本文共 7487 字,大约阅读时间需要 24 分钟。

前言

提供了一种事件驱动的计算模型。函数的执行是由事件驱动的,函数计算触发器描述了一组规则,当某个事件满足这些规则,事件源就会触发相应的函数。函数计算已经将 作为一个事件源用于触发函数, 用于对存入oss的文件进行自动化处理:

image

如上图所示,阿里云对象存储和函数计算无缝集成。您可以为各种类型的事件设置处理函数,当OSS系统捕获到指定类型的事件后,会自动调用函数处理。例如,您可以设置函数来处理PutObject事件,当您调用OSS PutObject API上传图片到OSS后,相关联的函数会自动触发来处理该图片。

在本文中,以用户上传超大压缩文件( zip 类型)到 oss, oss 系统捕获 PutObjec/PostObject 的事件, 自动触发函数执行, 函数将压缩文件解压,并将对应的解压文件放在oss 指定的 bucket 的某个目录下。 比如在bucket myzipfilebucket source 目录是上传压缩文件的目录, processed 目录是解压后压缩文件存放的目录

方法

在本文中,以python3 runtime 为例,一步一步慢慢展示fc的能力

简单法

因为 FC 的临时目录的空间大小为512M,如果使用落盘,明显是下下策, 不仅增加 io 时间, 而且 512M 的限制基本让人敬而远之了。所以我们把一切在内存中完成, 于是有下面的解决方案, FC 从内网中拉取 OSS 中的压缩文件,然后一切在内存中完成,将解压后的文件的 bytes 上传到指定 bucket 中目录, 于是有了下面的代码:

# -*- coding: utf-8 -*-import oss2, jsonimport zipfileimport os, ioimport loggingLOGGER = logging.getLogger()def handler(event, context):  """  The object from OSS will be decompressed automatically .  param: event:   The OSS event json string. Including oss object uri and other information.      For detail info, please refer https://help.aliyun.com/document_detail/70140.html?spm=a2c4g.11186623.6.578.5eb8cc74AJCA9p#OSS    param: context: The function context, including credential and runtime info.      For detail info, please refer to https://help.aliyun.com/document_detail/56316.html#using-context  """  evt_lst = json.loads(event)  creds = context.credentials  auth=oss2.StsAuth(     creds.access_key_id,     creds.access_key_secret,     creds.security_token)  evt = evt_lst['events'][0]  bucket_name = evt['oss']['bucket']['name']  endpoint = 'oss-' +  evt['region'] + '-internal.aliyuncs.com'  bucket = oss2.Bucket(auth, endpoint, bucket_name)  object_name = evt['oss']['object']['key']  """  When a source/ prefix object is placed in an OSS, it is hoped that the object will be decompressed and then stored in the OSS as processed/ prefixed.  For example, source/a.zip will be processed as processed/a/...   "source /", "processed/" can be changed according to the user's requirements."""  file_type = os.path.splitext(object_name)[1]  if file_type != ".zip":    raise RuntimeError('{} filetype is not zip'.format(object_name))  newKey = object_name.replace("source/", "processed/")  remote_stream = bucket.get_object(object_name)  if not remote_stream:    raise RuntimeError('failed to get oss object. bucket: %s. object: %s' % (bucket_name, object_name))  zip_buffer = io.BytesIO(remote_stream.read())  LOGGER.info('download object from oss success: {}'.format(object_name))  newKey = newKey.replace(".zip", "/")  with zipfile.ZipFile(zip_buffer) as zip_file:    for name in zip_file.namelist():      with zip_file.open(name) as file_obj:        # fix chinese directory name garbled problem        name = name.encode(encoding='cp437')        detect = chardet.detect(name)        confidence = detect["confidence"]        if confidence >= 0.8:           name = name.decode(encoding=detect["encoding"])        else:           name = name.decode(encoding="gb2312")        bucket.put_object(newKey +  name, file_obj.read())
  • 优点

    • 方法简单,一切在内存中完成,适合较小的压缩文件解压
  • 不足

    • 有内存的限制,假设压缩文件稍微大点,就很容易超出函数计算执行环境最大内存 3G 的限制
    • 假设压缩文件大小差异很大,以最大压缩文件消耗的内存设置函数内存规格, 增加函数执行费用

流式法

完整的代码示例请在附件下载

很快,我们自然想到不能完全把压缩文件的内容全部通过 FC 作为 中转站来处理,如果我们先获取压缩文件的 meta 信息,比如我们先拉取压缩文件中的 meta 信息的字节流(很小), 分析出这个大的压缩文件里面有哪些文件,这些文件对应到压缩文件字节流中的起止位置信息;通过这些信息, 压缩文件里面的每个文件都能构造出一个 , 那么在 FC 这边, 只需要将 get 的 file-like object 进行解压,同时将解压后的内容 as a file-like object put 到指定的 bucket 目录。 完全没必要把所有内容一下子拖到这里统一加工。

  • 改造 zipfile

    在 python 中,我们继续使用 这个lib,看起来是这个库支持参数是 file-like object, 但是这个库要求 file-like object 具有 seek 和 tell 接口, oss get_object 获得GetObjectResult 类型的对象虽然是一个 file-like object, 但是没有seektell 接口, 因此我们 zipfile 进行了一些改造:

  • 同时,构造出能被 zipfile 支持的file-like object

    # -*- coding: utf-8 -*-import oss2from oss2 import utils, modelsimport ossZipfile as zipfilezipfile_support_oss = zipfile# support upload to oss as a file-like objectdef make_crc_adapter(data, init_crc=0):  data = utils.to_bytes(data)  # file-like object  if hasattr(data,'read'):     return utils._FileLikeAdapter(data, crc_callback=utils.Crc64(init_crc))utils.make_crc_adapter = make_crc_adapterclass OssStreamFileLikeObject(object):  def __init__(self, bucket, key):    super(OssStreamFileLikeObject, self).__init__()    self._bucket = bucket    self._key = key    self._meta_data = self._bucket.get_object_meta(self._key)  @property  def bucket(self):    return self._bucket  @property  def key(self):    return self._key  @property  def filesize(self):    return self._meta_data.content_length  def get_reader(self, begin, end):    begin = begin if begin >= 0 else 0    end = end if end > 0 else self.filesize - 1    end = end if end < self.filesize else self.filesize - 1     begin = begin if begin < end else end    return self._bucket.get_object(self._key, byte_range=(begin, end))  def get_content_bytes(self, begin, end):    reader = self.get_reader(begin, end)    return reader.read()  def get_last_content_bytes(self, offset):    return self.get_content_bytes(self.filesize-offset, self.filesize-1)
  • 入口函数

    # -*- coding: utf-8 -*-import helperimport oss2, jsonimport osimport loggingimport chardetLOGGER = logging.getLogger()def handler(event, context):  """  The object from OSS will be decompressed automatically .  param: event:   The OSS event json string. Including oss object uri and other information.      For detail info, please refer https://help.aliyun.com/document_detail/70140.html?spm=a2c4g.11186623.6.578.5eb8cc74AJCA9p#OSS  param: context: The function context, including credential and runtime info.      For detail info, please refer to https://help.aliyun.com/document_detail/56316.html#using-context  """  evt_lst = json.loads(event)  creds = context.credentials  auth=oss2.StsAuth(    creds.access_key_id,    creds.access_key_secret,    creds.security_token)  evt = evt_lst['events'][0]  bucket_name = evt['oss']['bucket']['name']  endpoint = 'oss-' +  evt['region'] + '-internal.aliyuncs.com'  bucket = oss2.Bucket(auth, endpoint, bucket_name)  object_name = evt['oss']['object']['key']  """  When a source/ prefix object is placed in an OSS, it is hoped that the object will be decompressed and then stored in the OSS as processed/ prefixed.  For example, source/a.zip will be processed as processed/a/...   "Source /", "processed/" can be changed according to the user's requirements."""  file_type = os.path.splitext(object_name)[1]  if file_type != ".zip":    raise RuntimeError('{} filetype is not zip'.format(object_name))  LOGGER.info("start to decompress zip file")  newKey = object_name.replace("source/", "processed/")  zip_fp = helper.OssStreamFileLikeObject(bucket, object_name)  newKey = newKey.replace(".zip", "/")  with helper.zipfile_support_oss.ZipFile(zip_fp) as zip_file:    for name in zip_file.namelist():      with zip_file.open(name) as file_obj:         # fix chinese directory name garbled problem        name = name.encode(encoding='cp437')        detect = chardet.detect(name)        confidence = detect["confidence"]        if confidence >= 0.8:          name = name.decode(encoding=detect["encoding"])        else:          name = name.decode(encoding="gb2312")         # upload object as a file_obj        bucket.put_object(newKey +  name, file_obj)
  • 优点

    • 可以突破内存限制,小内存的函数也可以干大压缩文件文件的自动解压存放工作
      image
  • 不足

    • 对于较小的压缩文件,不如简单的方法来的简洁和直接

总结

本文针对 oss 上传 zip 压缩文件进行自动解压进行了一些方案的探讨与尝试, 分析了两种方案各自的优点和不足。 但是这两种方法都有一个共同的限制,就是函数执行时间最大为15分钟,如果压缩文件足够大和足够复杂(里面有很多的小文件), 需要合理评估时间, 执行时间 = 解压时间(纯cpu计算,这个跟设置函数内存规格大小成线性关系,需要合理设置) + 网络io 时间(走内网); 一般来说本地(两核3G的配置)解压时间在10分钟以内的, FC 应该都有能力处理。

转载地址:http://ylykl.baihongyu.com/

你可能感兴趣的文章
性能测试面试问题(一)
查看>>
其他内置函数
查看>>
ES6 基础学习
查看>>
Centos7二进制文件安装MySQL5.7.25
查看>>
多线程下的单例模式
查看>>
ExpandableListView置顶和子list收放
查看>>
微信资源包混淆
查看>>
第二课: 雷电模拟器的多开和打开
查看>>
div内居中图片小组件
查看>>
删除所有存储过程的sql语句
查看>>
Java学习之文件及异常处理
查看>>
802.11n笔记
查看>>
C++ 内存管理之四(new/delete vs malloc/free)
查看>>
前端工程化—部署
查看>>
怀恋g9
查看>>
Python——uuid
查看>>
【实习记】2014-08-18使用curl排错http头的content-length
查看>>
/usr/bin/uwsgi --http :8888 --wsgi-file wsgi.py --master --processes 4 --threads 2
查看>>
Codeforces Beta Round #17 D. Notepad
查看>>
Linux基础知识
查看>>