배경
실습으로 배우는 선착순 이벤트 시스템을 학습하면서,
분산 환경에서 선착순으로 발생하는 동시성 문제를 해결하는 방법을 학습했습니다.
Kafka나 Redis를 이용하는 방식은 물론 여러모로 장점이 많지만,
토이 프로젝트나, 간단한 어플리케이션 등에서 사용하기엔 비용적인 측면에서 부담이 클 수 있습니다.
그로 인해, Redis와 Kafka를 각각 AtomicInteger와 BlockingQueue로 대체하여 자바 코드로 구현했습니다.
전체 코드는 GitHub Repository에서 확인 가능합니다.
Coupon
@Entity
@NoArgsConstructor(access = AccessLevel.PROTECTED)
@Getter
public class Coupon {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private Long userId;
public Coupon(final Long userId) {
this.userId = userId;
}
@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final Coupon coupon = (Coupon) o;
return Objects.equals(userId, coupon.userId);
}
@Override
public int hashCode() {
return Objects.hash(userId);
}
}
어떤 유저에게 쿠폰이 발급되었는 지 저장하는 간단한 Coupon
엔티티입니다.
유저 당 하나의 쿠폰만 발급 가능하게 하기 위해 equals
와 hashcode
를 구현하였습니다.
CouponService
@Service
public class CouponService {
private final CouponRepository couponRepository;
private final CouponQueueHandler couponQueueHandler;
private final AtomicInteger couponCount = new AtomicInteger();
public CouponService(
final CouponRepository couponRepository,
final CouponQueueHandler couponQueueHandler
) {
this.couponRepository = couponRepository;
this.couponQueueHandler = couponQueueHandler;
}
public void issue(Long userId) {
int count = couponCount.incrementAndGet();
if (count > 100) {
throw new IllegalArgumentException("쿠폰 발행갯수를 초과하였습니다.");
}
couponQueueHandler.produce(new Coupon(userId));
}
@Transactional(readOnly = true)
public Coupon getCoupon(Long couponId) {
return couponRepository.findById(couponId)
.orElseThrow(IllegalArgumentException::new);
}
public void resetCount() {
couponCount.set(0);
}
}
issue() 메서드는 발급 요청이 들어오면 AtomicInteger의 숫자를 증가시키고,
CouponQueueHandler에게 새로운 쿠폰의 발급을 Produce합니다.
직접 DB에 접근하지 않기 때문에 별도의 @Transactional은 붙여주지 않았습니다.
CouponQueueHandler
@Component
public class DefaultCouponQueueHandler implements CouponQueueHandler {
private final BlockingQueue<Coupon> queue = new LinkedBlockingQueue<>();
private final CouponBatchRepository couponBatchRepository;
public DefaultCouponQueueHandler(final CouponBatchRepository couponBatchRepository) {
this.couponBatchRepository = couponBatchRepository;
}
@Override
public void produce(Coupon coupon) {
try {
queue.put(coupon);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
@PostConstruct
private void consume() {
new Thread(() -> {
while (true) {
try {
if (queue.isEmpty()) {
Thread.sleep(1000);
continue;
}
List<Coupon> coupons = extractUniqueCoupons();
couponBatchRepository.saveAll(coupons);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}).start();
}
private List<Coupon> extractUniqueCoupons() {
List<Coupon> allCoupons = new ArrayList<>();
queue.drainTo(allCoupons);
return allCoupons.stream().
distinct()
.limit(100L)
.toList();
}
}
해당 클래스는 CouponQueueHandler
인터페이스를 구현하며,
쿠폰 생성 요청을 비동기적으로 처리하는 역할을 담당합니다.
내부적으로 BlockingQueue를 사용하여 쿠폰 생성 요청을 관리하고,
CouponBatchRepository를 통해 데이터베이스에 배치로 쿠폰 정보를 저장합니다.
produce 메서드를 통해 쿠폰 생성 요청이 들어오면, 이를 BlockingQueue에 추가합니다.
consume 메서드는 @PostConstruct 어노테이션을 사용하여 애플리케이션이 시작할 때 자동으로 실행됩니다.
이 메서드에서 생성된 별도의 스레드는 큐를 지속적으로 모니터링하며,
큐가 비어있지 않을 경우 쿠폰 정보를 데이터베이스에 Batch로 저장합니다.
하나의 스레드가 쿠폰 생성 요청을 보내므로,
동시적으로 발생한 요청을 순차적으로 바꾸는 효과를 줍니다.
추가적으로, 큐에 있는 요소들을 한꺼번에 BatchUpdate하므로
DB에 보내는 요청의 갯수를 줄이는 효과도 있습니다.
LinkedBlockingQueue의 take()
메서드를 사용할 경우,
큐에 요소들이 들어올때까지 자동으로 blocking 상태가 됩니다.
그러나 배치 처리를 최적화하기 위하여 drainTo() 메서드를 사용하였습니다.
큐가 비어있다면, 큐에 충분히 요소들이 들어올때까지 sleep하였다가
요소들을 한꺼번에 처리하게 됩니다.
CouponBatchRepository
@Repository
@Transactional
public class CouponBatchRepository {
private final JdbcTemplate jdbcTemplate;
public CouponBatchRepository(final JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
public void saveAll(final List<Coupon> coupons) {
String sql = "INSERT INTO coupon (user_id) VALUES (?)";
jdbcTemplate.batchUpdate(sql, new BatchPreparedStatementSetter() {
@Override
public void setValues(final PreparedStatement ps, final int i) throws SQLException {
Coupon coupon = coupons.get(i);
ps.setLong(1, coupon.getUserId());
}
@Override
public int getBatchSize() {
return coupons.size();
}
});
}
}
JdbcTemplate을 이용하여 BatchUpdate해주는 클래스입니다.
Application Layer에서는 해당 기능을 사용하지 않기 때문에
Spring Data의 CustomRepository로 구현하지는 않았습니다.
InfraStructure 레이어에 위치하며, 같은 레이어에 위치한
CouponQueueHandler가 해당 클래스를 주입 받아서 이용합니다.
결론
자바 코드베이스 내에서 직접 구현하면서 느낀 장단점은 아래와 같습니다.
장점 | 단점 |
---|---|
별도의 인프라 구성이나 외부 서비스의 설치 및 구성이 필요 없어, 개발 환경 설정 및 배포 과정이 간소화됩니다. | 처리 용량 증가를 위해 추가 하드웨어나 스케일 아웃이 필요할 때 확장성이 제한될 수 있습니다. |
외부 의존성 없이 애플리케이션 내에서 모든 로직을 관리할 수 있어, 코드 통합성이 높아지고 구조가 단순해집니다. | 애플리케이션의 규모가 커짐에 따라, 모든 기능을 하나의 코드 베이스 내에서 관리하는 것이 복잡도를 증가시킬 수 있습니다. |
추가 서버 또는 서비스에 대한 비용을 절감할 수 있습니다. | Redis와 Kafka 데이터 복제, 지속성, 고가용성 등을 위한 기능을 이용할 수 없어, 장애 발생 시 데이터 손실 위험이 있을 수 있습니다. |
네트워크 호출이 필요 없이 애플리케이션 내부에서 모든 처리가 이루어지므로, 네트워크 지연 시간이 제거됩니다. | Kafka의 스트림 처리나 Redis의 고속 데이터 액세스와 같은 기능을 이용할 수 없어 성능이나 효율성 측면에서 제한이 있을 수 있습니다. |
추후에 고려하면 좋을 점
- 쿠폰의 종류가 여러가지 일 때, AtomicInteger외에 무엇을 추가해야할까?
(ConcurrentHashMap과 AtomicInteger를 함께 쓰는 방향 고려해보기) - consumer가 데이터 처리 중 오류가 발생했을 때, 어떤 식으로 처리해야 데이터 유실을 방지할 수 있을까?
(예외를 어떤 식으로 처리할 것인지...)