Skip to content

Commit 1fdaa89

Browse files
authored
Merge pull request #87 from TAVE-9RP/dev
ETL 파이프라인 적용, 재고 기능 (Dev -> Main)
2 parents 70cb6a5 + 4c6711c commit 1fdaa89

29 files changed

+1004
-83
lines changed

build.gradle

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,11 @@ dependencies {
4343
runtimeOnly 'io.jsonwebtoken:jjwt-impl:0.11.5'
4444
runtimeOnly 'io.jsonwebtoken:jjwt-jackson:0.11.5'
4545

46+
47+
// S3 의존성(최신 버전)
48+
implementation 'software.amazon.awssdk:s3:2.25.28'
49+
implementation 'software.amazon.awssdk:auth:2.25.28'
50+
4651
implementation 'org.apache.commons:commons-csv:1.10.0'
4752

4853
}

src/main/java/com/nexerp/domain/analytics/application/AnalyticsExportOrchestrator.java

Lines changed: 63 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,16 @@
44

55
import com.nexerp.domain.analytics.domain.ExportFileName;
66
import com.nexerp.domain.analytics.domain.ExportTable;
7+
import com.nexerp.domain.analytics.infra.storage.LocalTmpStorage;
8+
import com.nexerp.domain.analytics.infra.storage.S3Storage;
79
import com.nexerp.domain.analytics.port.CsvWriterPort;
810
import com.nexerp.domain.analytics.port.ExtractorPort;
911
import com.nexerp.domain.analytics.port.StoragePort;
12+
13+
import java.io.IOException;
1014
import java.io.OutputStream;
15+
import java.nio.file.Files;
16+
import java.nio.file.Path;
1117
import java.time.LocalDate;
1218
import java.time.YearMonth;
1319
import java.util.Collections;
@@ -22,18 +28,24 @@
2228
import java.util.concurrent.atomic.AtomicInteger;
2329
import lombok.RequiredArgsConstructor;
2430
import lombok.extern.slf4j.Slf4j;
31+
import org.springframework.beans.factory.annotation.Qualifier;
2532
import org.springframework.stereotype.Service;
2633

34+
import javax.sql.DataSource;
35+
2736
@Slf4j
2837
@Service
2938
@RequiredArgsConstructor
3039
public class AnalyticsExportOrchestrator {
3140

3241
private final List<ExtractorPort> extractors;
33-
private final StoragePort storage;
42+
private final LocalTmpStorage storage; // 구현체 구분
43+
private final S3Storage s3Storage;
3444
private final CsvWriterPort writer;
3545
private final Executor analyticsExportExecutor;
3646

47+
// READONLY_DB 인식 확인 용도 (로컬, 운영)
48+
private final @Qualifier("analyticsReadOnlyDataSource") DataSource analyticsDataSource;
3749
/**
3850
* [Fail-Fast 병렬 내보내기] 1. 하나라도 실패하면 즉시 전체 작업을 중단합니다. 2. 실패 시 이미 성공하여 생성된 파일들도 모두 삭제(Cleanup)합니다.
3951
* 3. 원자적 파일 생성을 위해 임시 파일(tmp)에 먼저 쓰고 성공 시 최종 위치로 이동합니다.
@@ -50,6 +62,7 @@ public Map<ExportTable, ExportResult> exportAllFailFastParallel(LocalDate date)
5062

5163
// 작성된 파일 이름 리스트 (CopyOnWriteArrayList)
5264
List<String> createdFinalFiles = new CopyOnWriteArrayList<>();
65+
List<String> createdS3Keys = new CopyOnWriteArrayList<>(); // S3 롤백용 리스트 추가
5366

5467
CompletableFuture<Void> firstFailure = new CompletableFuture<>();
5568
// completeExceptionally(실패) 를 한번만 전파하기 위한 원자적 블리언
@@ -91,26 +104,55 @@ public Map<ExportTable, ExportResult> exportAllFailFastParallel(LocalDate date)
91104

92105
try {
93106
// 전체 성공 시 통과, 하나라도 실패 시 예외가 여기서 터짐
94-
race.join();
107+
race.join(); // 1.모든 로컬 파일 생성 완료 대기
108+
109+
log.info("[AnalyticsExport] 로컬 생성 완료. S3 업로드 및 원자적 이동 시작.");
110+
111+
// 2. S3 업로드 (하나라도 실패 시 예외 처리)
112+
uploadAllToS3(createdFinalFiles, createdS3Keys);
113+
114+
// 3. 성공 시 로컬 파일 삭제 (서버 용량 확보)
115+
cleanupLocalFiles(createdFinalFiles);
95116

96117
long allElapsedMs = NANOSECONDS.toMillis(System.nanoTime() - allStart);
97118
log.info("[AnalyticsExport] 전체 테이블 수={} 총 소요 시간={}", results.size(),
98119
allElapsedMs);
120+
log.info("[AnalyticsExport] 접속 DB URL: {}",
121+
analyticsDataSource.getConnection().getMetaData().getURL());
99122
return results;
100123

101-
} catch (CompletionException e) {
102-
Throwable cause = (e.getCause() != null) ? e.getCause() : e;
124+
} catch (Exception e) {
125+
Throwable cause = (e instanceof CompletionException) ? e.getCause() : e; // CompletionException, IOException 등을 모두 처리
103126
log.error("[AnalyticsExport] FAIL-FAST triggered. Export stopped.", cause);
104127

105128
//나머지 스레드 모두 중지
106129
futures.forEach(f -> f.cancel(true));
107-
//성공했던 파일들도 모두 제거
108-
cleanupFiles(createdFinalFiles);
130+
131+
// 롤백 실행: S3에 올라간 파일 삭제 + 로컬 파일 삭제
132+
rollback(createdFinalFiles, createdS3Keys);
109133

110134
throw new RuntimeException("분석 데이터 내보내기 중 오류가 발생하여 전체 작업을 중단합니다.", cause);
111135
}
112136
}
113137

138+
/**
139+
* 모든 파일을 S3로 업로드. 하나라도 실패하면 예외 발생
140+
*/
141+
private void uploadAllToS3(List<String> localPaths, List<String> createdS3Keys) throws IOException {
142+
for (String localPath : localPaths) {
143+
String fileName = Path.of(localPath).getFileName().toString();
144+
String s3Key = s3Storage.resolve(fileName);
145+
146+
try (OutputStream s30s = s3Storage.openOutputStream(s3Key)) {
147+
Files.copy(Path.of(localPath), s30s);
148+
createdS3Keys.add(s3Key); // 성공 기록
149+
log.info("[S3Upload] Success: {}", s3Key);
150+
} catch (Exception e) {
151+
throw new IOException("S3 업로드 실패: " + fileName, e);
152+
}
153+
}
154+
}
155+
114156
/**
115157
* 부분 파일 방지: - tmp 파일에 쓰고 - 성공하면 final 파일로 move - 실패하면 tmp 삭제
116158
*/
@@ -148,10 +190,24 @@ private ExportResult exportByExtractorAtomic(
148190
}
149191
}
150192

193+
private void rollback(List<String> localFiles, List<String> s3Keys) {
194+
// S3 데이터 삭제
195+
for (String key : s3Keys) {
196+
try {
197+
s3Storage.deleteIfExists(key);
198+
log.info("[Rollback] Deleted S3 Key: {}", key);
199+
} catch (Exception e) {
200+
log.warn("[Rollback] S3 삭제 실패: {}", key);
201+
}
202+
}
203+
204+
// 로컬 데이터 삭제
205+
cleanupLocalFiles(localFiles);
206+
}
151207
/**
152208
* 실패 시 이미 만들어진 파일들을 정리
153209
*/
154-
private void cleanupFiles(List<String> createdFinalFiles) {
210+
private void cleanupLocalFiles(List<String> createdFinalFiles) {
155211
for (String path : createdFinalFiles) {
156212
try {
157213
storage.deleteIfExists(path);
@@ -162,29 +218,6 @@ private void cleanupFiles(List<String> createdFinalFiles) {
162218
}
163219
}
164220

165-
public int deleteTwoMonthsAgo(LocalDate now) {
166-
YearMonth target = YearMonth.from(now.minusMonths(4)); // 1월이면 9월
167-
AtomicInteger deleted = new AtomicInteger();
168-
169-
storage.listBaseFiles()
170-
.forEach(fileName -> {
171-
ExportFileName parsed;
172-
try {
173-
parsed = ExportFileName.parse(fileName);
174-
} catch (IllegalArgumentException e) {
175-
return;
176-
}
177-
178-
if (YearMonth.from(parsed.date()).equals(target)) {
179-
String fullName = storage.resolve(fileName).toString();
180-
storage.deleteIfExists(fullName);
181-
deleted.incrementAndGet();
182-
}
183-
});
184-
185-
return deleted.get();
186-
}
187-
188221
//ExportResult 객체 하나는 데이터베이스의 특정 테이블 하나를 CSV 파일 하나로 추출한 결과
189222
public record ExportResult(ExportTable table, LocalDate date, long rowCount) {
190223

src/main/java/com/nexerp/domain/analytics/config/AnalyticsExportProperties.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@ public record AnalyticsExportProperties(
77
// 로컬 저장 위치 파일 명 제외
88
String localPath,
99
// 제거 주기
10-
int retentionMonths
10+
int retentionMonths,
11+
String s3Bucket,
12+
String s3KeyPrefix
1113
) {
1214

1315
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package com.nexerp.domain.analytics.infra.extractor.inventoryitem;
2+
3+
import com.nexerp.domain.analytics.domain.ExportTable;
4+
import com.nexerp.domain.analytics.port.ExtractorPort;
5+
import lombok.RequiredArgsConstructor;
6+
import org.springframework.beans.factory.annotation.Qualifier;
7+
import org.springframework.jdbc.core.JdbcTemplate;
8+
import org.springframework.stereotype.Component;
9+
10+
import java.sql.ResultSet;
11+
import java.sql.SQLException;
12+
import java.time.LocalDate;
13+
import java.util.stream.Stream;
14+
15+
@Component
16+
@RequiredArgsConstructor
17+
public class InventoryItemExtractor implements ExtractorPort {
18+
19+
private final @Qualifier("readOnlyJdbcTemplate") JdbcTemplate jdbcTemplate;
20+
@Override
21+
public ExportTable table() {
22+
return ExportTable.INVENTORY_ITEM;
23+
}
24+
25+
@Override
26+
public String[] header() {
27+
return new String[]{
28+
"date",
29+
"inventory_item_id",
30+
"item_id",
31+
"inventory_id"
32+
};
33+
}
34+
35+
@Override
36+
public Stream<String[]> extractRows(LocalDate date) {
37+
String sql = """
38+
SELECT inventory_item_id,
39+
item_id,
40+
inventory_id
41+
FROM inventory_item
42+
ORDER BY inventory_item_id
43+
""";
44+
45+
return jdbcTemplate.queryForStream(sql, this::mapToRow)
46+
.map(row -> row.toCsvArray(date));
47+
}
48+
49+
private InventoryItemRow mapToRow(ResultSet rs, int rowNum) throws SQLException {
50+
return new InventoryItemRow(
51+
rs.getLong("inventory_item_id"),
52+
rs.getLong("item_id"),
53+
rs.getLong("inventory_id")
54+
);
55+
}
56+
57+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package com.nexerp.domain.analytics.infra.extractor.inventoryitem;
2+
3+
import java.time.LocalDate;
4+
5+
public record InventoryItemRow(
6+
long inventoryItemId,
7+
long itemId,
8+
long inventoryId
9+
10+
) {
11+
public String[] toCsvArray(LocalDate exportDate) {
12+
return new String[] {
13+
exportDate.toString(),
14+
String.valueOf(inventoryItemId),
15+
String.valueOf(itemId),
16+
String.valueOf(inventoryId)
17+
};
18+
}
19+
}
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
package com.nexerp.domain.analytics.infra.storage;
2+
3+
import com.nexerp.domain.analytics.config.AnalyticsExportProperties;
4+
import com.nexerp.domain.analytics.port.StoragePort;
5+
import lombok.RequiredArgsConstructor;
6+
import lombok.extern.slf4j.Slf4j;
7+
import org.springframework.context.annotation.Primary;
8+
import org.springframework.stereotype.Component;
9+
import software.amazon.awssdk.core.sync.RequestBody;
10+
import software.amazon.awssdk.services.s3.S3Client;
11+
import software.amazon.awssdk.services.s3.model.*;
12+
13+
import java.io.ByteArrayOutputStream;
14+
import java.io.IOException;
15+
import java.io.OutputStream;
16+
import java.util.List;
17+
import java.util.UUID;
18+
import java.util.stream.Collectors;
19+
20+
@Slf4j
21+
@Component
22+
@Primary // 테스트 시 S3 우선 사용 목적
23+
@RequiredArgsConstructor
24+
public class S3Storage implements StoragePort {
25+
26+
private final S3Client s3Client;
27+
private final AnalyticsExportProperties props;
28+
29+
@Override
30+
public void ensureBaseDir() {
31+
log.info("[S3Storage] Base bucket: {}", props.s3Bucket());
32+
}
33+
34+
@Override
35+
public String resolve(String fileNmae) {
36+
// S3 내의 경로 반환
37+
// 예: analytics/exports/inventory--2026-01-05.csv
38+
return props.s3KeyPrefix() + "/" + fileNmae;
39+
}
40+
41+
@Override
42+
public String resolveTemp(String finalPath) {
43+
return finalPath + ".tmp-" + UUID.randomUUID();
44+
}
45+
46+
@Override
47+
public OutputStream openOutputStream(String fullPath) throws IOException {
48+
return new ByteArrayOutputStream() {
49+
@Override
50+
public void close() throws IOException {
51+
super.close();
52+
byte[] bytes = toByteArray();
53+
s3Client.putObject(PutObjectRequest.builder()
54+
.bucket(props.s3Bucket())
55+
.key(fullPath)
56+
.build(), RequestBody.fromBytes(bytes));
57+
log.info("[S3Storage] Uploaded to S3: {}", fullPath);
58+
}
59+
};
60+
}
61+
62+
@Override
63+
public void moveAtomic(String sourceFullPath, String targetFullPath) throws IOException {
64+
try {
65+
CopyObjectRequest copyRequest = CopyObjectRequest.builder()
66+
.sourceBucket(props.s3Bucket())
67+
.sourceKey(sourceFullPath)
68+
.destinationBucket(props.s3Bucket())
69+
.destinationKey(targetFullPath)
70+
.build();
71+
s3Client.copyObject(copyRequest);
72+
73+
// 원본 삭제
74+
deleteIfExists(sourceFullPath);
75+
76+
log.info("[S3Storage] Moved {} to {}", sourceFullPath, targetFullPath);
77+
} catch (S3Exception e) {
78+
throw new IOException("Failed to move S3 object", e);
79+
}
80+
}
81+
82+
@Override
83+
public void deleteIfExists(String fullPath) {
84+
try {
85+
DeleteObjectRequest deleteRequest = DeleteObjectRequest.builder()
86+
.bucket(props.s3Bucket())
87+
.key(fullPath)
88+
.build();
89+
s3Client.deleteObject(deleteRequest);
90+
} catch (S3Exception e) {
91+
log.warn("[S3Storage] Failed to delete S3 object: {}", fullPath);
92+
}
93+
}
94+
95+
@Override
96+
public List<String> listBaseFiles() {
97+
try {
98+
ListObjectsV2Request listRequest = ListObjectsV2Request.builder()
99+
.bucket(props.s3Bucket())
100+
.prefix(props.s3KeyPrefix() + "/")
101+
.build();
102+
103+
ListObjectsV2Response result = s3Client.listObjectsV2(listRequest);
104+
105+
return result.contents().stream()
106+
.map(S3Object::key)
107+
// 폴더(Prefix) 자체는 제외하고 파일명만 추출
108+
.filter(key -> !key.endsWith("/"))
109+
.map(key -> key.substring(key.lastIndexOf("/") + 1))
110+
.collect(Collectors.toList());
111+
} catch (S3Exception e) {
112+
throw new IllegalStateException("Failed to list S3 objects", e);
113+
}
114+
}
115+
116+
117+
}

0 commit comments

Comments
 (0)