https://www.baeldung.com 에서 블로그를 찾아보다가 Java 23에 대한 글을 보게되었다.
https://foojay.io/today/java-23-has-arrived-and-it-brings-a-truckload-of-changes/ 이 글인데
Java23의 특징과 22의 대한 차이점 위주로 설명하는 포스팅이다.
여기서 가장 흥미를 끌었던 내용은 비동기의 처리 부분이였는데
JEP 480: Structured Concurrency (Third Preview)
Java's take on concurrency has always been
unstructured
, meaning that tasks run independently of each other. There's no hierarchy, scope, or other structure involved, which means errors or cancellation intent is hard to communicate.
이부분에 끌려서 갑자기 읽게 되었다. 해석하자면 다음과 같다
Java의 동시성에 대한 관점은 항상 비구조적이어서 작업이 서로 독립적으로 실행됩니다. 계층 구조, 범위 또는 기타 구조가 없으므로 오류나 취소 의도를 전달하기 어렵습니다.
대부분의 개발자가 기존의 Java에서 비동기를 처리할 수 있는 부분은
ExcutorService를 통해서 Thread를 조정하고 Java 21을 사용한다면 VirtualThread를 사용해서 더 많은 스레드를 처리할수 있지만, 결국 같은 동시성에 대한 관점을 가지고 있기 때문에 항상 모두가 독립적으로 실행된다는 것이다.
public class MultiWaiterRestaurant implements Restaurant {
@Override
public MultiCourseMeal announceMenu() throws ExecutionException, InterruptedException {
Waiter grover = new Waiter("Grover");
Waiter zoe = new Waiter("Zoe");
Waiter rosita = new Waiter("Rosita");
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
Future<Course> starter = executor.submit(() -> grover.announceCourse(CourseType.STARTER));
Future<Course> main = executor.submit(() -> zoe.announceCourse(CourseType.MAIN));
Future<Course> dessert = executor.submit(() -> rosita.announceCourse(CourseType.DESSERT));
return new MultiCourseMeal(starter.get(), main.get(), dessert.get());
}
}
}
다음을 보자. 다음은 starter, main, dessert의 각 작업을 개별적으로 수행한다. 그리고 마지막에. get 메서드를 통해서 각 작업이 끝나기를 기다린다. 당연해 보이는 과정이지만 여기에는 인지하지 못할 수 있는 문제들이 존재한다.
예를 들어 작업을 간단하게 1,2,3 작업이라고 지칭하겠다.
1번 작업이 시작되고 2 작업이 시작되고, 3 작업이 시작된다. 그 후 2번 작업이 오래 걸리는 과정이며 그동안 1번 작업이 실패했다고 가정하자. 그렇다면 최종적으로 1번작업이 실패했음에도 불구하고 나머지 작업들은 계속 수행 중이다.
각 스레드가 독립적으로 작동하기 때문에 오류를 전파할 수 없는 구조로 작동된다.
문제점은 이런 상황에서 발생하게 되는데 여기서 실패했음에도 계속 작동되는 스레드 때문에 스레드 누수가 발생하며
다른 작업으로 전파되지 않는다.
이런 경우 해결법은 단일 스레드를 사용하면 되는데, 비동기를 그냥 동기방식으로 지정하는 것이다.
public class SingleWaiterRestaurant implements Restaurant {
@Override
public MultiCourseMeal announceMenu() throws OutOfStockException {
Waiter elmo = new Waiter("Elmo");
Course starter = elmo.announceCourse(CourseType.STARTER);
Course main = elmo.announceCourse(CourseType.MAIN);
Course dessert = elmo.announceCourse(CourseType.DESSERT);
return new MultiCourseMeal(starter, main, dessert);
}
}
이런 경우 직관적이며 간단하다는 장점이 있고, 모든 작업이 하나의 스레드에서 실행되기 때문에 Thread-safe 한 구조가 완성된다.
하지만 대부분의 경우, 속도의 문제에 부딪혀 비동기를 사용한다.
그래서 등장한 것이 Structured Concurrency라는 개념이다.
설명을 보면 다음과 같다.
구조적 동시성 접근 방식에서 스레드는 명확한 계층 구조와 자체 범위, 명확한 진입 및 종료 지점을 갖습니다. 구조적 동시성은 함수 호출과 유사하게 스레드를 계층적으로 배열하여 부모-자식 관계의 트리를 형성합니다. 실행 범위는 모든 자식 스레드가 코드 구조와 일치하는 완료될 때까지 유지됩니다.
즉, 스레드를 구조를 갖고 사용하면서 스레드의 계층을 사용해
ParentThread -> {Child Thread1, Child Thread2.. Child Thread} 다음과 같은 방식으로 하위의 동작들을 묶을 수 있으며
이를 통해서 오류 전파나 Thread끼리의 관계를 확실하게 정의할 수 있다.
사용하는 방법은 아래와 같이 사용한다.
public class StructuredConcurrencyRestaurant implements Restaurant {
@Override
public MultiCourseMeal announceMenu() throws ExecutionException, InterruptedException {
Waiter grover = new Waiter("Grover");
Waiter zoe = new Waiter("Zoe");
Waiter rosita = new Waiter("Rosita");
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
Supplier<Course> starter = scope.fork(() -> grover.announceCourse(CourseType.STARTER));
Supplier<Course> main = scope.fork(() -> zoe.announceCourse(CourseType.MAIN));
Supplier<Course> dessert = scope.fork(() -> rosita.announceCourse(CourseType.DESSERT));
scope.join(); // 1
scope.throwIfFailed(); // 2
return new MultiCourseMeal(starter.get(), main.get(), dessert.get()); // 3
}
}
}
예시를 보면 다음과 같은데 보면 기존 다음과 같이 사용했던 excutorService 대신에 Scope구조를 도입했다.
try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor())
# Scope 구조 도입
try (var scope = new StructuredTaskScope.ShutdownOnFailure())
여기서 1 부분은 모든 스레드가 작업을 마칠 때까지 기다린다는 것이며, 구체적으로 보면
스레드 하나가 중단된다면 Join부분에서 InterruptedException이 발생한다.
2에서는 스레드 중하나에서 예외가 발생하면 ExecutionException이 발생한다.
3에 도달하면 모든 스레드에서 문제없이 진행이 완료되었는지 확인하고 결과를 검색해 처리할 수 있다.
가장 큰 차이점은 scope로 자식 스레드를 fork로 생성하는 부분이다. 이제 스레드의 수명은 scope에 달려있으며
try-with-resource로 scope를 묶어주었기 때문에 하위의 스레드 동작들을 관리할 수 있다.
또한 하위 스레드의 관리를 어떻게 할지의 대한 동작들도 추가로 제공하는 방식을 제공한다.
ShutdownOnFailure 정책과 ShutdownOnSuccess 방식이다.
말 그래도 ShutdownOnFailure 정책은 스레드 중 한 개라도 문제가 생겼을 시 모든 fork된 스레드를 종료한다.
ShutdownOnSuccess는 스레드중 한개라도 성공 시 모든 fork 된 스레드를 종료하는데 이는 불필요한 작업을 방지하는데 유용하게 쓰인다.
public record DrinkOrder(Guest guest, Drink drink) {}
public class StructuredConcurrencyBar implements Bar {
@Override
public DrinkOrder determineDrinkOrder(Guest guest) throws InterruptedException, ExecutionException {
Waiter zoe = new Waiter("Zoe");
Waiter elmo = new Waiter("Elmo");
try (var scope = new StructuredTaskScope.ShutdownOnSuccess<DrinkOrder>()) {
scope.fork(() -> zoe.getDrinkOrder(guest, BEER, WINE, JUICE));
scope.fork(() -> elmo.getDrinkOrder(guest, COFFEE, TEA, COCKTAIL, DISTILLED));
return scope.join().result(); // 1
}
}
}
위 코드에서는 성공적으로 완료된 작업을 먼저 반환하게 된다. 그리고 블로그 아래 더 보면 사용자정의 handleComlete 메서드로 사용자 정의 종료 정책을 커스텀할 수 있는 방식도 제공한다.
사실 여기까지가 보면서 흥미롭게 봤던 내용이고 나머지는 ZGC를 이제 기본으로 사용한다거나, Stream에 Gather API를 추가,
swtich case문에 패턴매칭기능 ( case int i when i > 100 && i <1000; ) 등을 사용할 수 있다거나 하는 흥미로운 내용들이 많으니 찾아보시는 것을 추천한다.
이제 블로그를 보고 내 프로젝트에 비동기로 처리하는 부분이 있어서 StructuredTaskScope를 도입해서 계층적 구조로 만들고
fork 된 하위 작업들이 실패하는 경우 전부 실패하도록 하여 조금 더 스레드 누수 방지와 빠른 오류전파로 효율적으로 만들고 싶어 코드를 바꿔보았다. 그리고 실행을 시켜봤는데 다만 놓친 부분이 있었는데 이는 아직 preview stage상태여서 이 방법을 쓰고 싶으면
build 할시 --enable-preview를 붙여서 빌드해야 했기에 리팩토링만 진행하기로 했다.
@Transactional
public void runAsyncTasks(List<CoursePlaceGetReq> places, List<TagCreateReq> tags,
Course saveCourse)
throws InterruptedException {
List<Thread> threads = new ArrayList<>();
final boolean[] hasError = {false}; // 에러 발생 여부 확인
threads.add(runAsyncTaskWithExceptionHandling(() -> {
createCoursePlace(places, saveCourse);
}, hasError));
threads.add(runAsyncTaskWithExceptionHandling(() -> {
createCourseTags(tags, saveCourse);
}, hasError));
for (Thread thread : threads) {
thread.join();
}
if (hasError[0]) {
throw new RuntimeException("코스 생성중 오류 발생"); // 예외 발생 시 전체 작업 실패 처리
}
}
public Thread runAsyncTaskWithExceptionHandling(Runnable task, boolean[] hasError) {
return startVirtualThread(() -> {
try {
task.run();
} catch (Exception e) {
hasError[0] = true;
throw new DateRoadException(FailureCode.COURSE_CREATE_ERROR);
}
});
}
ExcutorService도 안 쓰고 그냥 Thread로 처리했던 로직이 있었는데 일단은 ExcutorService로 변경했다. 훨씬 더 깔끔해진 모습을 볼 수 있다.
@Transactional
public void runAsyncTasks(List<CoursePlaceGetReq> places, List<TagCreateReq> tags, Course saveCourse) {
try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
CompletableFuture<Void> placeFuture = CompletableFuture.runAsync(() -> createCoursePlace(places, saveCourse), executor);
CompletableFuture<Void> tagFuture = CompletableFuture.runAsync(() -> createCourseTags(tags, saveCourse), executor);
CompletableFuture.allOf(placeFuture, tagFuture).join();
}
}
물론 이 부분도 preview API인 StructuredTaskScope를 도입한다면 더욱 깔끔하게 실패를 전파할 수 있을 것 같지만 차후에 적용해보록 하겠다.
다만 여기서 runAsyncTasks를 사용하는 부분에서 착각했던 부분이 있는데 그냥 @Transactional에 대한 이해를
내가 작성한 로직에서 rollback 하고 싶은 부분을 하나로 묶는다라고 생각했던 부분이 큰 오산이었다.
여기서 짚고 넘어가야 할 부분은 스프링 트랜잭션 관리방식과 비동기 처리 방식 간의 차이를 인식하고 넘어가야 한다.
기본적으로 스프링에서의 트랜잭션은 동일한 스레드 내에서 실행될 때만 경계를 유지한다. 따라서 위의 비동기 작업은 별도의 스레드에서 실행되기 때문에 원래의 트랜잭션 콘텍스트를 상속받지 못한다.
즉, 각 작업에 @Transactional 어노테이션을 붙여도 개별 트랜잭션으로 실행되는 것이지, 기존의 트랜잭션에 참여하는 것이 아니라는 것이다.
위의 로직은 하나의 API에서 코스 객체 생성, 코스 연관 객체 3개 생성, 코스 저장, 유저정보 업데이트 등으로 이루어져 있다.
그러나 각 DB의 접근하는 부분과 네트워크 부분을 Virtual Thread를 활용하여 작성했으므로 @Transcational 어노테이션을 붙이더라도 각 개별 트랜잭션으로 실행된다. 순서는 다음과 같다
- 코스 생성 -> 코스 ID가 필요함 : Create
- TranscationlEventListener로 코스가 commit 된 이후 course Id를 가져오도록 함
- Tag, Image, 등등의 코스 관련 객체를 ID를 FK로 지정해 저장 : Create
- Course의 썸네일 지정 : Update
- 포인트 생성 : Create
- 유저에게 포인트 지급 -> 유저 정보 Update
다음과 같은 로직으로 이루어져 있는데 기존의 동기로 처리한다면 아무 문제가 없다. 그러나 기존의 로직은 이런 방식으로 구성되어 있었는데 그림으로 보면 다음으로 이루어져 있었다.

지금 봤는데 별꼴인 것 같다. 속도는 빠르지만 비동기로직을 작성해서 트랜잭션이 전부 분리되어 있었고 코스 생성한 다음 다른 엔티티들에게 코스를 FK로 걸어줘야 했기에 나머지들이 실패해도 Course는 계속해서 반환하는 로직으로 되어있었다.
그렇다면 비동기에서 트랜잭션을 어떻게 수행해야 할까?
상위에서 한 번에 비동기로 처리해 CompletableFuture.allOf()으로 상위의 작업으로 묶어주는 방법도 있지만 Course를 저장하고 rollback 시키는 것보다는 코스 엔티티가 일부 저장되더라도, 코스에 대한 rollback과 User와 Point 생성을 rollback 시키기 위해서 코스 관련 엔티티 중 Image 생성 중 에러가 발생하는 경우를 제외하고 전부 rollback 하도록 만들었다.
상위에서 CompletableFuture로 모든 것을 엮어주는 것도 좋은 방법이라 생각했는데 사실 Entity의 관계를 객체들 간의 관계로 직접 매핑해 주었기 때문에 다른 엔티티의 영속화 과정에서 코스 객체가 필요한 다른 엔티티가 모두 실패했다. 따라서 다음과 같은 로직을 구성하게 되었다. 물론 Transactional의 isolation 레벨을 좀더 낮추어서 Course의ㅣ 변경상태를
감지할수 있지 않을까? 했는데 작동하지 않는다. 내부적으로 먼저 코스 영속화, 코스 관련 entity의 commit 호출순서 때문에 아직 영속화는 다 되어있지만, 코스가 commit 되지 않아 일어나는 에러였다. 따라서 아래 방식처럼 구현했다.
다음과 같은 구조로 만들어진 코드는 다음과 같아졌다.
@Transactional
@CacheEvict(value = "courses", allEntries = true)
public CourseCreateRes createCourse(final Long userId, final CourseCreateReq courseRegisterReq,
final List<CoursePlaceGetReq> places, final List<MultipartFile> images,
List<TagCreateReq> tags) throws ExecutionException, InterruptedException {
final float totalTime = places.stream()
.map(CoursePlaceGetReq::getDuration)
.reduce(0.0f, Float::sum);
User user = getUser(userId);
Course course = Course.create(
user,
...
totalTime
);
Course newcourse = courseRepository.save(course);
eventPublisher.publishEvent(CourseCreateEvent.of(newcourse, places, tags));
String thumbnail = asyncService.createCourseImages(images, newcourse);// 썸
course.setThumbnail(thumbnail);
courseRepository.save(newcourse); // 최종적으로 썸네일을 반영하여 저장
RecordId recordId = asyncService.publishEvenUserPoint(userId, PointUseReq.of(Constants.COURSE_CREATE_POINT, TransactionType.POINT_GAINED, "코스 등록하기"));
Long userCourseCount = courseRepository.countByUser(user);
courseRollbackService.rollbackCourse(recordId);
return CourseCreateRes.of(newcourse.getId(), user.getTotalPoint() + Constants.COURSE_CREATE_POINT, userCourseCount);
}
@TransactionalEventListener
public void handleCourseCreatedEvent(CourseCreateEvent event) {
asyncService.runAsyncTasks(event);
}
여기서 redis Event를 사용하는 부분은 redis stream에 message를 보낸 후 message를 Listener가 위 트랜잭션에서 작동하는 것이 아니므로 보상트랜잭션 개념으로 pending 된 메시지를 확인하고 pending message Id가 일치할 경우 에러를 반환하도록 하여
위 트랜잭션에서 오류가 발생하도록 하여 전체 롤백이 되도록 구성하였다.
추가로 CompletableFuture를 활용하여 비동기를 처리할 경우 트랜잭션의 경계를 설정해 줄 수도 있었는데, 이 부분에서는
코스 객체의 Id값이 존재하는 영속화가 필요했기 때문에 @TransactionalEventListener로 이후에 진행되도록 했지만
이 경우에는 다른 트랜잭션으로 실행된다는 것을 알아야 한다.
비동기에 관련해서 여러가지 찾아보다보니 다양한 개념들을 알게 되었는데 다음에는 좀더 재밌는 내용을 가지고 포스팅을 적어보려 한다.
'Spring' 카테고리의 다른 글
멀티모듈 프로젝트 Docker 빌드 전략: Path 기반 접근법[Docker&Github Action] (1) | 2024.12.14 |
---|---|
가상 스레드 vs 반응형 프로그래밍 [Spring/Java] (2) | 2024.11.22 |
[Spring] Redis기반의 EventStreamListener가 동작하지 않는 문제 (2) | 2024.10.15 |
[Spring] Redis Stream을 사용한 EventPublisher, EventListener 구현 (0) | 2024.08.05 |
Swagger로 사랑받는 개발자 되기 [ Spring ] (0) | 2024.08.01 |