Python操作对象存储(AWS S3 use boto3)


提取/封装boto3 S3方法便于使用

使用AWS S3 时,参数AWS_ENDPOINT置空即可, 使用腾讯云cos,阿里云oss或者私有对象存储minio时, 指定AWS_ENDPOINT为服务url

class S3(object):
    """
    tenant_id: 租户id作为第一层文件夹
    bucket: IC3文件默认存储在DEFAULT_BUCKET内
    """

    def __init__(self, tenant_id, bucket=DEFAULT_BUCKET):
        self.tenant_id = tenant_id
        self.session = boto3.Session(
            aws_access_key_id=AWS_SERVER_PUBLIC_KEY,
            aws_secret_access_key=AWS_SERVER_SECRET_KEY,
            region_name=AWS_ENDPOINT,
        )
        self.s3_client = self.session.client("s3", endpoint_url=ENDPOINT_URL)
        self.bucket_name = bucket
        # self.s3_source = self.session.resource("s3")
        # self.bucket = self.s3_source.Bucket(bucket)

    def upload(self, category, filename, filedata, acl="public-read", media=False, obj_id=""):
        """
        上传文件接口
        category: 分类
        filename: 文件名称
        filedata: 文件内容[二进制或文本]
        acl='private'|'public-read'
            private: 私有文件, 无法使用公用url下载,需借助S3.download_file() 或 使用预签名url S3.generate_presigned_url()
            public-read: 可使用公用url下载(安全性差): url=S3.s3_base_url() + key
        return: key
        文件存储路径示例: T123/category/obj_id/2022-01-01/QWERTY
        """
        while True:
            key = "{}/{}/{}{}-{}".format(
                self.tenant_id,
                category.replace("/", "_"),
                (str(obj_id) + "/") if obj_id else "",
                datetime.datetime.now(pytz.timezone("Asia/Shanghai")).strftime(
                    "%Y-%m-%d"
                ),
                "".join(random.sample(string.ascii_letters, 6)),
            )
            if not self.head_object(key):
                break
        filename = quote(filename)
        self.s3_client.put_object(
            Bucket=self.bucket_name,
            Key=key,
            Body=filedata,
            ACL=acl,
            ContentDisposition=f"attachment;filename={filename}",
        )
        if media:
            key = "media/" + key
        return key

    def download_file(self, key, filename):
        # 下载文件 f=file_path:str
        self.s3_client.download_file(self.bucket_name, key, filename)

    def generate_presigned_url(self, key, ExpiresIn=86400):
        """
        获取预签名url
        key: 上传文件时返回的key
        ExpiresIn: 有效时间(秒), 默认1天, 不能超过7天
        """
        if ExpiresIn > 604800:
            raise Exception("<ExpiresIn> Must be less than 604800")
        return self.s3_client.generate_presigned_url(
            ClientMethod="get_object",
            Params={"Bucket": self.bucket_name, "Key": key},
            ExpiresIn=ExpiresIn,
        )

    @staticmethod
    def s3_base_url(bucket=DEFAULT_BUCKET):
        return f"https://{bucket}.s3.{AWS_ENDPOINT}.amazonaws.com.cn"

    def head_object(self, key):
        try:
            self.s3_client.head_object(Bucket=self.bucket_name, Key=key)
            return True
        except:
            return False

    def upload_free(self, key, filename, file_path, acl="public-read"):
        # 文件上传, 自由指定key, 文件路径
        filename = quote(filename)
        self.s3_client.upload_file(
            file_path,
            self.bucket_name,
            key,
            {"ACL": acl, "ContentDisposition": f"attachment;filename={filename}"}
        )
        print("upload:", key)
        return key

    def download_fileobj(self, key, f):
        # 下载文件 f=open(file_path, 'wb')
        self.s3_client.download_fileobj(self.bucket_name, key, f)

原创文章,作者:kepupublish,如若转载,请注明出处:https://blog.ytso.com/278228.html

(0)
上一篇 2022年8月1日
下一篇 2022年8月1日

相关推荐

发表回复

登录后才能评论