[kt cloud 서비스개발팀 이지윤 님]
BufferedReader.readLine()와 InterruptedException
kt cloud의 DBaaS 상품에는 생성된 VM에 원격으로 접속해서 OS 명령어로 DB의 상태를 체크하는 과정이 주기적으로 실행되고 있습니다. 그리고 OS 명령어의 결과값을 처리할 때에는 표준 출력 스트림과 에러 스트림을 비동기식으로 처리하고 있습니다.
이 과정에서 Java의 Process 객체를 사용해서 OS 명령어 처리를 위한 외부 프로세스를 관리하고,
BufferedReader를 사용해서 외부 프로세스의 출력/에러 스트림을 처리하고,
스트림 처리를 비동기식으로 수행하기 위해 CompletableFuture를 사용하고 있습니다.
DBaaS에서 실제로 어떻게 외부 프로세스 사용 및 스트림 처리를 하고 있는지를 설명하기 전에 먼저 간단하게 Java의 Process 클래스, BufferedReader 클래스, CompletableFuture 클래스에 대해서 설명드리겠습니다.
Process 클래스
Process 클래스는 외부 시스템 프로세스를 실행하고 제어하는 데 사용되고, 이를 통해 Java 프로그램은 Runtime.getRuntime().exec() 메서드나 ProcessBuilder를 통해 Process 인스턴스를 생성하고 관리하며 시스템 명령이나 외부 애플리케이션을 실행할 수 있습니다.
- 주요 메서드
- getInputStream(), getErrorStream()
- 프로세스의 표준 출력 및 에러 출력을 가져오는 데 사용됨
- waitFor()
- 현재 스레드가 프로세스가 종료될 때까지 대기하도록 함
- destroy()
- 프로세스를 강제 종료시킴
- getInputStream(), getErrorStream()
BufferedReader 클래스
BufferedReader 클래스는 문자 기반 입력 스트림에서 텍스트를 읽는 데 사용되며, 효율적인 성능을 위해 버퍼링하여 제공합니다. 특히 readLine() 메서드는 데이터를 한 줄씩 읽어 들이는데, 이 메서드는 블로킹 메서드라는 특징이 있습니다. 블로킹 메서드란 호출 시 해당 작업이 완료될 때까지 현재 스레드의 실행이 중단되는 것을 말합니다.
즉 readLine()은 읽을 데이터가 없으면 입력이 도착할 때까지 멈춰있기 때문에, 데이터가 들어올 때까지 무기한 대기할 수 있습니다.
CompletableFuture 클래스
CompletableFuture 는 Java8에서 추가된 비동기 프로그래밍을 지원하는 클래스로, 비동기 작업을 수행하고 완료된 작업의 결과를 처리하는 데 유용한 기능을 제공합니다. Java5에서 추가되었던 Future와 비교했을 때 가장 큰 장점은 외부에서 완료를 시킬 수 있고, 그 결과값으로 다른 작업을 하도록 연결지을 수 있다는 것입니다. 또, 비동기 작업 중 발생할 수 있는 예외를 쉽게 처리할 수 있는 방법을 제공하기도 합니다.
간단한 예시 코드와 함께 CompletableFuture의 몇 가지 메소드를 알아보겠습니다.
public class CompletableFutureExample { public static void main(String[] args) { // 비동기 작업 시작: 공급자(Supplier)를 사용하여 결과를 반환 CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { throw new IllegalStateException(e); } return "Hello from the Future!"; }); System.out.println("Async operation started..."); // 결과 가져오기 try { String result = future.get(); // 비동기 작업이 완료될 때까지 대기 System.out.println(result); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } } |
Async operation started... (2초 대기) Hello from the Future! |
위의 예시 코드와 실행 결과에서 알 수 있듯이 CompletableFuture의 supplyAsync()를 통해 간단하게 비동기 작업을 수행하고, CompletableFuture<String>로 반환된 결과값을 get() 메소드를 통해 가져와서 별도의 처리를 할 수 있게 됩니다.
또 아래의 예시 코드처럼 exceptionally() 메소드를 통해서 예외 처리도 가능합니다.
public class CompletableFutureExceptionExample { public static void main(String[] args) { CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { if (Math.random() > 0.5) { throw new RuntimeException("Something went wrong!"); } return "Success!"; }).exceptionally(ex -> { System.out.println("Exception: " + ex.getMessage()); return "Default Result"; // 예외 발생 시 대체 값 }); future.thenAccept(System.out::println); } } |
외부 프로세스 실행 및 스트림 처리 과정
아래는 DBaaS에서 외부 프로세스 실행 및 스트림 처리까지의 실제 코드를 바탕으로 간단하게 그려본 흐름도입니다.
CompletableFuture를 통해서 ForkJoinPool 스레드가 2개씩 생성되고, 각각 표준 출력 스트림과 에러 출력 스트림 처리를 하게 됩니다.
아래의 코드는 실제 코드의 일부입니다. 코드를 보면 CompletableFuture를 통해 OS 명령어의 결과에 대한 스트림 처리를 하게 하고, 지정된 시간만큼만 기다렸다가 그 시간을 초과하면 TimeoutException이 발생됩니다. 그리고 catch문의 TimeoutException에서 예외 처리를 하게 됩니다.
try { Process process = builder.start(); String stdout = ""; String stderr = ""; // handler.act()를 호출해 비동기 작업 처리 CompletableFuture<String> stdoutFuture = handler.act(process.getInputStream()); CompletableFuture<String> stderrFuture = handler.act(process.getErrorStream()); // 지정된 timeoutSeconds 값만큼 기다렸다가 결과값을 가져온다. stdout = stdoutFuture.get(timeoutSeconds, SECONDS); stderr = stderrFuture.get(timeoutSeconds, SECONDS); int exitCode = process.waitFor(); } catch (TimeoutException e) { // TimeoutException에 대한 예외처리 log.warn("TimeoutException({}: 초): Command - {} --> Exception - ", timeoutSeconds, cmd.getCommand(), e); } |
이렇게 DB 상태 조회에 대한 스트림 처리를 CompletableFuture를 이용해 잘 하고 있다고 생각을 했었습니다. 그러던 어느 날, 고객 VM에 메모리 가용량이 매우 부족해서 상태 조회에 대한 응답값을 제 시간에 전달받지 못하는 일이 생겼습니다.
모니터링 시스템은 TimeoutException 예외처리 구문에 작성한대로 로그 기록 및 알람 전송까지 완료했지만, 이상하게도 스트림 처리 스레드의 active 개수가 계속 증가하고 있었습니다. 그러다 TaskRejectedException이 발생했고 모니터링 시스템이 일시적으로 다운되는 문제가 생겼습니다.
분명 get() 메소드를 통해 timeoutException을 발생시키고, 예외처리까지 다 했는데 왜 스레드가 종료되지 않는 건지 이해하기가 어려웠습니다. 문제의 원인을 파악하기 위해 문제 상황을 재연하고 한 줄씩 로그를 찍어보며 디버깅을 진행했고, 그 결과 OS 명령어의 결과값에 대한 스트림 처리를 위해 생성된 스레드 자원이 제대로 정리되지 않아서 생긴 문제라는 것을 알게 되었습니다.
CompletableFuture를 통해 생성되는 스레드의 종료
우선 모니터링 시스템에서 실제로 CompletableFuture의 supplyAscnc함수를 호출하는 것은 아래 코드의 act()함수입니다.
@Async("customExecutor") public CompletableFuture<String> act(InputStream inputStream) { return CompletableFuture.supplyAsync(() -> { try (BufferedReader reader = new BufferedReader( new InputStreamReader(inputStream, StandardCharsets.UTF_8))) { return reader.lines() .collect(Collectors.joining("\n")); } catch (RuntimeException | IOException e) { throw new RuntimeException("Error processing input stream", e); } }); } |
즉, 앞서 말했던 문제 상황에서 스트림 처리 스레드(=Command스레드)의 active 개수가 계속 증가했던 이유는 TimeoutException 예외 처리 구문에 문제가 있었던 것이 아니라 단순하게 CompletableFuture를 통해 생성된 스레드가 종료되지 않기 때문이었습니다.
CompletableFuture의 반환값을 가지고 get()메소드에 timeout 설정을 해서 TimeoutException을 발생시키고 예외처리를 했지만, CompletableFuture를 통해 생성된 실제 스레드는 언제, 어떻게 종료되는건지 생각하지 않았던 것이 문제였습니다.
위의 그림을 보면 알 수 있듯이 모니터링 대상이 되는 VM에 장애가 발생해 OS 명령어를 처리하지 못하는 상태가 되면 BufferedReader.readLine() 메서드가 실행되는 ForkJoinPool에서는 Process 인스턴스의 표준 출력값과 에러 출력값이 들어오길 기다리며 무한 대기 상태로 빠지게 됩니다. ForkJoinPool 스레드의 응답을 기다리는 Command 스레드도 마찬가지로 무한 대기 상태에 빠집니다.
하지만 CompletableFuture의 반환값을 처리하는 코드에서 get() 메서드에 timeout을 지정했기 때문에, TimeoutException이 발생하고 예외 처리 구문이 실행됩니다.
그렇게 되면 주기적으로 DB 인스턴스의 상태를 조회하도록 등록된 Scheduling Job은 사이클이 완료됐다고 판단하고, 다음 조회 사이클을 수행하게 됩니다.
결국, 대기 상태로 빠진 ForkJoinPool 스레드와 Command 스레드가 계속 쌓이면서 스레드 풀 초과 문제를 야기하게 됩니다.
Process의 강제종료
이런 문제를 해결하기 위해서는 BufferedReader.readLine() 메서드를 호출하기 전에 입력 데이터의 상태를 체크하거나 주기적으로 스레드 인터럽트 상태를 체크하게 하는 방법이 있을 것 같습니다.
또 다른 방법으로는 BufferedReader.readLine()가 기다리는 입력데이터, 즉 Process에서 실행된 OS 명령어의 결과값을 전달해주면 됩니다.
결과적으로 DBaaS 상품에서는 현재 TimeoutException을 포함한 기타 예외가 발생하면, process 인스턴스를 강제 종료하는 destoryForcibly()를 호출하는 코드를 작성해 지정된 시간이 지나면 자원이 정리되도록 했습니다.
이 방법이 제일 적합한 방법이 아닐 수도 있겠지만, BufferedReader와 Process를 사용해 외부 프로세스의 결과값을 처리하는 과정에 도움이 되었으면 합니다.
관련글
1. How To Manage Timeout for CompletableFuture | Baeldung
'Tech story > Cloud' 카테고리의 다른 글
OpenStack 컴퓨팅 서비스 이해하기: 심화편 (2) | 2024.11.18 |
---|---|
What is DevOps? - Helm Chart (11) | 2024.11.13 |
What is DevOps? - Slack으로 협업하기 (3) | 2024.11.13 |
What is DevOps? - CI Automation (7) | 2024.11.13 |
What is DevOps? - Github Action (8) | 2024.11.11 |