分布式对象存储是将任意类型和大小的数据通过对象形式存储,主要用于存储大规模非结构化数据。Framework在抽象层定义分布式对象存储操作统一抽象接口,并在实现层提供阿里云对象存储服务(OSS)、MinIO 的实现。
抽象层将主要接口分为两类,一类是以Service为后缀的接口,这类接口更多用于业务链路,它提供给开发者在业务处理中使用,支持上传文件、分段上传、删除对象等常用操作。另一类是以Manager为后缀的接口,这类接口更多用于管控链路,用于对象存储服务的管理而非业务处理,支持管理和配置存储桶等操作。
分布式对象存储服务中通用操作,在 Framework 中定义了相关接口方法,可以直接调用;各存储服务高阶或独特的用例,提供了代理接口,通过获取存储服务客户端进行调用。
分布式对象存储抽象设计如下:
@startuml
package "cloudapp-base-api" {
package "filestore" {
interface StorageClientAware<Client> {
Client getDelegatingStorageClient()
}
together {
interface StorageObjectService<Client> {
String DEFAULT_CONTENT_TYPE = "application/octet-stream";
boolean copy(String bucketName, String sourcePath, String targetPath,boolean override)
boolean deleteObject(String bucketName, String path)
boolean deleteObjects(String bucketName, Collection<String> objects)
InputStream getObject(String bucketName, String path)
default boolean putObject(String bucketName, String path, InputStream body)
boolean putObject(String bucketName, String path, InputStream body,String contentType)
Collection<String> listObjectVersions(String bucketName,String path,String sinceVersion,int count)
default Collection<String> listTop10LatestObjectVersions(String bucketName, String path)
boolean restoreObject(String bucketName, String path, int days, String tier)
}
interface MultiPartsService<Client> {
Map<String, Integer> uploadObjects(String bucketName,String objectName,List<Pair<String, InputStream>> objects)
Map<String, Integer> uploadBigFile(String bucketName,String objectName,Path filePath,int maxSizePerPart)
}
}
together {
interface BucketManager<Client, B, M, O>{
List<Bucket<B >> listAllBucketsWithPrefix(String prefix,String resourceGroupId)
Pagination<Bucket<B >> listPagingBuckets(String prefix,String resourceGroupId,Pagination<Bucket<B >> pagingMaker)
boolean createBucket(String bucketName)
Bucket<B > createBucket(Bucket<B > bucket)
boolean deleteBucket(String bucketName)
String getBucketLocation(String bucketName)
ObjectMetadata<M> headObject(String bucketName, String objectName)
ObjectMetadata<M> headObject(String bucketName,String objectName,String versionId)
Collection<ObjectItem<O>> listTopNObjects(String bucketName, int count)
Pagination<ObjectItem<O>> listObjects(String bucketName,Pagination<ObjectItem<O>> pageMarker)
}
interface BucketLifeCycleManager<Client, Lifecycle> {
void transitToWithLastAccessDays(String bucketName,String objectPrefixName,String storageType,int lastAccessDays)
void expireObjectsWithLastAccessDays(String bucketName,String objectPrefixName,int lastAccessDays)
void expireObjectsWithCreateBefore(String bucketName,Date createBefore)
void expireObjectsAfterVersionNoncurrentDays(String bucketName,String objectPrefixName,int noncurrentDays)
Lifecycle getBucketLifeCycle(String bucketName)
boolean deleteBucketLifeCycle(String bucketName)
}
interface ObjectPolicyManager<Client> {
void grantAccessPermissions(String bucketName, String objectName, String accessAcl)
String getObjectPolicy(String bucketName, String objectName)
boolean deleteObjectPolicy(String bucketName, String objectName)
}
}
StorageClientAware <-- StorageObjectService : extends
StorageClientAware <-- BucketManager : extends
StorageClientAware <-- BucketLifeCycleManager : extends
StorageClientAware <-- ObjectPolicyManager : extends
StorageClientAware <-- MultiPartsService : extends
StorageObjectService -[hidden]- MultiPartsService
BucketManager -[hidden]- BucketLifeCycleManager
BucketLifeCycleManager -[hidden]- ObjectPolicyManager
StorageObjectService <--[hidden]- BucketLifeCycleManager
MultiPartsService <--[hidden]- ObjectPolicyManager
}
}
@enduml-
定义存储服务代理客户端接口 StorageClientAware ,主要方法有:
- getDelegatingStorageClient() 获取当前使用的存储服务客户端对象;
-
定义对象管理接口 StorageObjectService ,主要方法有:
-
copy(String bucketName, String sourcePath, String targetPath,boolean override) 创建已存在文件的副本;
-
deleteObject(String bucketName, String path) 删除文件;
-
deleteObjects(String bucketName, Collection objects) 删除一组文件;
-
getObject(String bucketName, String path) 获取文件;
-
putObject(String bucketName, String path, InputStream body) 上传文件;
-
putObject(String bucketName, String path, InputStream body,String contentType) 上传文件
-
listObjectVersions(String bucketName,String path,String sinceVersion,int count) 获取存储桶中所有文件的版本信息;
-
listTop10LatestObjectVersions(String bucketName,String path) 获取最新的 10 个文件版本;
-
restoreObject(String bucketName, String path, int days, String tier) 恢复已归档的文件;
-
-
定义分段上传操作接口 MultiPartsService ,主要方法有:
-
uploadObjects(String bucketName,String objectName,List<Pair<String, InputStream>> objects) 上传文件;
-
uploadBigFile(String bucketName,String objectName,Path filePath,int maxSizePerPart) 上传大文件;
-
-
定义存储桶管理接口 BucketManager ,主要方法有:
-
listAllBucketsWithPrefix(String prefix, String resourceGroupId) 按前缀获取存储桶列表;
-
listPagingBuckets(String prefix,String resourceGroupId,Pagination<Bucket<B >> pagingMaker) 按前缀以分页形式获取存储桶列表;
-
createBucket(String bucketName) 创建存储桶;
-
createBucket(Bucket<B > bucket) 创建存储桶;
-
deleteBucket(String bucketName) 删除存储桶;
-
getBucketLocation(String bucketName) 获取存储桶的位置信息;
-
headObject(String bucketName, String objectName) 获取文件元数据
-
headObject(String bucketName,String objectName,String versionId) 获取指定版本文件元数据
-
listTopNObjects(String bucketName, int count) 获取指定数量的文件;
-
listObjects(String bucketName,Pagination<ObjectItem> pageMarker) 分页形式获取文件列表;
-
-
定义生命周期规则接口 BucketLifeCycleManager ,主要方法有:
-
transitToWithLastAccessDays(String bucketName,String objectPrefixName,String storageType,int lastAccessDays) 配置基于最后一次访问时间的转换存储类型生命周期规则;
-
expireObjectsWithLastAccessDays(String bucketName,String objectPrefixName,int lastAccessDays) 配置基于最后一次访问时间的过期生命周期规则;
-
expireObjectsWithCreateBefore(String bucketName,String objectPrefixName,Date createBefore) 配置最后一次修改时间早于指定时间的过期生命周期规则;
-
expireObjectsAfterVersionNoncurrentDays(String bucketName,String objectPrefixName,int noncurrentDays) 在版本控制时,配置成为非当前版本在指定时间后的过期生命周期规则;
-
getBucketLifeCycle(String bucketName) 获取存储桶的生命周期配置;
-
deleteBucketLifeCycle(String bucketName) 删除存储桶的生命周期配置;
-
-
定义权限策略接口 ObjectPolicyManager ,主要方法有:
-
grantAccessPermissions(String bucketName, String objectName, String accessAcl) 配置权限策略;
-
getObjectPolicy(String bucketName, String objectName) 获取权限策略
-
deleteObjectPolicy(String bucketName, String objectName) 删除权限策略;
-
分布式对象存储实现的配置如下:
- 定义 OSS 实现的配置参数类 OSSEndpointProperties ,继承 EndpointProperties ,使用注解 @ConfigurationProperties(prefix = AliyunFileStoreComponent.BINDING_PROP_KEY) ,其中 AliyunFileStoreComponent.BINDING_PROP_KEY=com.alibaba.cloudapp.filestore.aliyun ,配置参数类字段如下:
| 字段名 | 数据类型 | 默认值 | 备注 |
|---|---|---|---|
| enabled | boolean | false | 是否启用 |
| accessKey | String | - | AccessKey ID |
| secretKey | String | - | AccessKey Secret |
| region | String | - | 区域 |
| stsToken | String | - | 临时身份凭证 |
| endpoint | String | - | 接入点 |
- 定义 Minio 实现的配置参数类 MinioEndpointProperties ,继承 EndpointProperties ,使用注解 @ConfigurationProperties(prefix = com.alibaba.cloudapp.filestore.minio) ,配置参数类字段如下:
| 字段名 | 数据类型 | 默认值 | 备注 |
|---|---|---|---|
| enabled | boolean | false | 是否启用 |
| accessKey | String | - | AccessKey ID |
| secretKey | String | - | AccessKey Secret |
| region | String | - | 区域 |
| stsToken | String | - | 临时身份凭证 |
| endpoint | String | - | 接入点 |
添加 CloudApp Starter 和 OSS 分布式对象存储的实现依赖。
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.alibaba.cloudapp</groupId>
<artifactId>cloudapp-framework-dependencies</artifactId>
<version>${cloudapp.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloudapp</groupId>
<artifactId>spring-boot-starter-cloudapp</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloudapp</groupId>
<artifactId>cloudapp-spring-filestore-aliyun</artifactId>
</dependency>
</dependencies>在应用中配置 CloudApp 的属性,在环境变量中配置accessKey、secretKey,端点使用 OSS 深圳地区的访问地址;
spring:
application:
name: filestore-aliyun-demo
io:
cloudapp:
filestore:
aliyun:
enabled: true
accessKey: ${ACCESS_KEY}
secretKey: ${SECRET_KEY}
endpoint: oss-cn-shenzhen.aliyuncs.com通过抽象的存储文件服务上传文件,以上传 Windows 系统中 C:\Users\cloudtogo\Desktop\test.txt 文件为例,将文件上传到 OSS 的 gxrtestoss 存储桶中。
public class BucketService {
@Autowired
private StorageObjectService storageObjectService;
public void createObject() {
try {
FileInputStream inputStream = new FileInputStream("C:\Users\cloudtogo\Desktop\test.txt");
storageObjectService.putObject("gxrtestoss", "test.txt", inputStream);
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (CloudAppException e) {
e.printStackTrace();
}
}
@PostConstruct
public void init() throws CloudAppException {
createObject();
}
}通过扩展接口获取委托类,然后再获取 OSS 文件标签信息。
添加 CloudApp Starter 和 OSS 分布式对象存储的实现依赖。
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.alibaba.cloudapp</groupId>
<artifactId>cloudapp-framework-dependencies</artifactId>
<version>${cloudapp.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloudapp</groupId>
<artifactId>spring-boot-starter-cloudapp</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloudapp</groupId>
<artifactId>cloudapp-spring-filestore-aliyun</artifactId>
</dependency>
</dependencies>在应用中配置 CloudApp 的属性,在环境变量中配置accessKey、secretKey,端点使用 OSS 深圳地区的访问地址;
spring:
application:
name: filestore-aliyun-demo
io:
cloudapp:
filestore:
aliyun:
enabled: true
accessKey: ${ACCESS_KEY}
secretKey: ${SECRET_KEY}
endpoint: oss-cn-shenzhen.aliyuncs.com通过扩展接口获取 OSS 原生客户端委托类,然后用于获取文件标签信息,以获取存储桶 gxrtestoss 中文件 test.txt 的 tag 为例。
public class BucketService {
@Autowired
private StorageObjectService storageObjectService;
public void getObjectTagging() {
OSSClient ossClient = (OSSClient) storageObjectService.getDelegatingStorageClient();
TagSet tagSet =ossClient.getObjectTagging("gxrtestoss", "test.txt");
System.out.println(tagSet.toString());
}
@PostConstruct
public void init() throws CloudAppException {
getObjectTagging();
}
}调用结果:
通过扩展接口获取委托类,然后再获取 OSS 文件标签信息。
添加 CloudApp Starter 和 MinIO 分布式对象存储的实现依赖,本例以 http 方式调用因此引入 web 依赖。
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.alibaba.cloudapp</groupId>
<artifactId>cloudapp-framework-dependencies</artifactId>
<version>${revision}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloudapp</groupId>
<artifactId>spring-boot-starter-cloudapp</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloudapp</groupId>
<artifactId>cloudapp-spring-filestore-minio</artifactId>
<exclusions>
<exclusion>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>4.12.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>在应用中配置 CloudApp MinIO 实现的属性,在环境变量中配置访问凭证 AK、SK 和 MinIO 访问端点MINIO_ENDPOINT;
spring:
application:
name: demo-minio
io:
cloudapp:
filestore:
minio:
enabled: true
endpoint: ${MINIO_ENDPOINT}
accessKey: ${AK}
secretKey: ${SK}
server:
port: 8080通过扩展接口获取 MinIO 原生客户端委托类,然后用于获取文件标签信息,以获取存储桶test987中文件test.txt 的 tag 为例。
@RestController
public class StorageObjectController {
private static final Logger logger = org.slf4j.LoggerFactory.getLogger(
StorageObjectController.class);
@Autowired
private StorageObjectService storageObjectService;
@RequestMapping("getObjectTag")
public void getObjectTag() {
MinioClient client = (MinioClient) storageObjectService.getDelegatingStorageClient();
GetObjectTagsArgs args = GetObjectTagsArgs.builder()
.bucket("test987")
.object("test.txt")
.build();
Tags tags = null;
try {
tags = client.getObjectTags(args);
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
Map<String, String> map = tags.get();
map.forEach((k, v) -> logger.info("{} : {}", k, v));
}
}调用结果:

