Kafka New Producer API를 활용한 유실 없는 비동기 데이터 전송

안녕하세요. SK플래닛의 Data Infrastructure팀(이하 DI팀) 강병수 입니다.
Spark Streaming으로 유실 없는 스트림 처리 인프라 구축하기 에 이어지는 DI팀의 포스팅입니다.

• Previous Posts

• Spark Streaming으로 유실 없는 스트림 처리 인프라 구축하기 written by 엄태욱

• Upcoming Posts

• RecoPick Stream Processing 데이터 처리 시스템 전환기: Storm에서 Spark Streaming으로

• 업데이트가 필요한 큰 테이블 입수 및 조회 성능 제고

이번 글을 통해 Kafka Producer API를 v0.8.1.1에서 v0.9.0.1로 업그레이드 하면서 얻은 경험을 공유하려 합니다. 어떤 버전의 Kafka Producer API를 사용하더라도, 빠른 속도로 대량의 Stream 데이터를 전송하는 동시에 데이터 유실과 중복을 없애거나 최소화 하는 방법에 대해 고민한 얘기를 해 보겠습니다.

The Big Picture

본격적인 설명에 들어가기 전에, SK플래닛의 Big Data 를 처리하기 위한 Data Infrastructure 를 간단히 살펴보겠습니다. 다음과 같이 크게 네 개의 영역으로 구성되어 있습니다.

• Production: 데이터를 정의, 개발, 입수하는 시스템 영역

• Process: 입수된 데이터를 다양한 용도로 처리하는 영역

• Provision: 입수 및 가공된 데이터를 사람이나 서비스가 활용할 수 있도록 제공하는 시스템 영역

• Platform: 입수 및 가공된 데이터를 관리하고, 각종 데이터 접근 엔진을 제공하는 영역

이번에 다룰 내용은 Production 영역에서 데이터의 중간 적재지로 사용하고 있는 Apache Kafka에 데이터를 밀어넣는 부분에 대한 것입니다. Kafka는 다음 세 가지 구성 요소로 나눠볼 수 있습니다.

• Broker: 메시지를 저장하고 제공하는 기능을 하는 분산 서비스 프로세스

• Producer: Kafka Producer API를 사용해 Kafka Broker에 메시지를 보냄

• Consumer: Kafka Consumer API를 사용해 Kafka Broker로 부터 메시지를 가져옴

간단히 생각하면, 마치 전통적인 관계형 DB 기반 시스템에서 DB를 중심으로 JDBC/ODBC API를 사용하는 다양한 Data Handling Application들이 존재하듯이, Kafka에도 Broker를 중심으로 Producer API와 Consumer API를 사용하는 다양한 Kafka Application들이 존재할 수 있습니다. 이 중에서, 좀 더 나은 Producer Application을 만들 수 있도록 새로 소개된 Kafka New Producer API의 특징에 대해 살펴보고, 저희가 이 API를 활용해 중복, 유실, 지연 없이 대량의 데이터를 효율적으로 전송하기 위해 고안한 프로그램에 대해서도 소개하겠습니다.

Kafka Producer API 업그레이드

Kafka Producer API는 데이터 발생지에서 Kafka Broker로 메시지를 전송하는데 쓰입니다. DI팀은 각종 Application 서버에서 발생하는 로그를 Kafka 메시지에 담아서 중앙의 Kafka Broker에 보내는 방법으로 서버 로그를 수집하고 있습니다. 서버 로그를 수집하는 방법은 두 가지가 있습니다.

• 서버에서 로컬 파일로 내린 로그를 LogAgent가 읽어서 전송하는 방법 (LogAgent 내부에서 RakeAPI 이용)

• 서버 프로세스 내에서 RakeAPI 라이브러리를 사용해서 Kafka Broker로 직접 전송하는 방법

두 방법 모두 RakeAPI라는, Kafka Producer API에 File Queue를 결합해 네트워크가 단절된 상황에서도 전송하는 쪽 로직에 영향을 주지 않고 메시지를 전달할 수 있도록 개발된 라이브러리를 사용합니다. 여기서는 이 RakeAPI에서 사용하는 Kafka Producer API를 Kafka Broker의 버전에 맞춰 업그레이드 하면서 겪은 경험을 정리해서 소개하는데 전념하고, File Queue에 대해서는 다루지 않겠습니다.

아래 그림은 RakeAPI를 통한 데이터 전송 흐름입니다. 참고로, 여기서는 Application 서버가 RakeAPI를 활용해 직접 로그 데이터를 전송하는 경우를 설명하고 있지만, 단말기에서 전송된 로그를 입수하는 RakeServer나 어플리케이션 서버에서 로컬 파일에 떨군 로그 스트림을 입수하는 LogAgent도 같은 구조입니다. 즉, RakeServer나 LogAgent도 어플리케이션 서버와 마찬가지로 ‘RakeAPI’와 그 아래의 ‘Kafka Producer API’를 활용해 Kafka Broker로 데이터를 전송하는 Kafka Producer Application들입니다.

DI팀은 사내외의 다양한 데이터를 안정적으로 Hadoop 클러스터에 적재해야 하기 때문에, Broker의 버전은 물론 클러스터의 구성 형태와 노드 수도 보수적으로 관리할 수 밖에 없습니다. 최근까지도 v0.8.1.1 Kafka Broker Cluster를 구성해 사용하고 있었고 이에 따라 Kafka Producer API 역시 v0.8.1.1을 사용하고 있었습니다.

2016년 상반기에 DI팀의 Kafka Broker를 v0.9.0.1로 업그레이드 하면서 RakeAPI에서 사용하는 Kafka Producer API도 v0.9.0.1로 업그레이드가 가능해졌습니다. Kafka Broker는 하위버전과 호환성을 지키기 때문에 RakeAPI의 Kafka Producer API를 v0.9.0.1로 업그레이드 하는 것이 필수 상황은 아니었지만, 이 글에서 소개하는 내용을 포함해 v0.9.0.1의 Kafka Producer API가 갖는 여러 장점들을 활용하기 위해 업그레이드를 결정했습니다. 또한 오픈소스인 Kafka가 지속적으로 발전하면서 Kafka Broker가 구버전의 Producer에 대한 호환 지원이 끊길 경우를 대비해 적정한 속도로 최신 버전을 따라가는 것이 향후 운영 및 관리에 편리하다고 판단했습니다.

Kafka New Producer API

데이터의 발생지에서 메시지 전송을 위해 사용하는 Producer API는 v0.8.2.* 때 새로 설계돼 New Producer API로 불립니다(Link: http://kafka.apache.org/082/documentation.html#producerapi). Kafka 개발자들이 New Producer API를 새로 설계하면서 API 구현도 기존의 Scala에서 Java로 바꿨습니다. Java의 Future, Selector API를 활용하면서 Callback 기반의 Asynchronous, Non-Blocking IO를 구현해 전송 속도를 대폭 증가시켰고, 동시에 전송 실패에 대한 처리 방식을 개선해 메시지 유실에 대한 대처 수준도 높였습니다.

Old Producer API를 New Producer API로 업그레이드 했을 때 생기는 장점과 단점은 다음과 같습니다.

장점 단점

• Non-Blocking socket를 사용함으로써 적은 수의 쓰레드로 더 좋은 처리량(throughput) 제공

• Asynchronous한 요청의 후처리가 가능하도록 callback 제공

• 전송 성공한 메시지마다 Offset 반환

• 전송 실패 시 메시지 순서 보장 불가

• 세밀한 메모리 관리 필요


Kafka Producer API

Kafka Producer API의 동작 원리를 소개하며 왜 이런 장점이 생기는지, 단점은 왜 발생하며 극복 방법은 무엇인지 살펴보겠습니다.

Kafka Producer API는 신구 버전 뿐만 아니라 같은 버전에서도 다양한 전송 방법을 지원합니다. 각각의 전송 방법들은 전송 속도, 순서 보장, 유실 및 중복 처리라는 세 가지 기준에서 비교해 볼 수 있습니다. Old Producer API에는 Sync mode, Async mode, Bulk mode라는 세 가지 전송 방식이 있었고, New Producer API에는 Async mode만 있습니다.

Sync mode는 전송하고 결과를 받는 과정을 하나의 실행 흐름 내에서 순서대로 처리하는 방식입니다. Async mode는 전송할 메시지를 Queue에 쌓기만 하고, Queue에 쌓인 메시지는 전송을 담당하는 별도의 쓰레드가 수행을 하기 때문에, 전송 요청 후 언제 전송이 될지 알 수 없는 비동기 방식입니다. Bulk mode는 Sync mode의 낮은 전송 효율을 극복하기 위해 여러 메시지를 묶어 한 번에 전송하는 방식입니다.

이 네 가지 방식을 전송속도, 순서 보장, 유실 및 중복 처리로 분류해서 특징을 정리해보면 다음과 같습니다.

• 전송 속도는 환경에 따라 달라질 수 있지만, 보통 Old Producer Async mode, Old Producer Bulk mode, New Producer Async mode의 경우 단위시간 내 전송량은 비슷합니다. 하지만 Old Producer Sync mode의 경우 나머지 셋보다 약 10배가량 느립니다.

• 순서 보장은 Old Producer Sync mode 외에는 모두 조건부 순서 보장인 ‘중’ 입니다. 즉, 정상 전송 시에는 순서가 보장되지만, 전송 실패 후 재전송 시 순서가 보장되지 않습니다.

• 유실 및 중복 처리는 Old Producer Sync mode와 New Producer Async mode가 ‘상’으로 유실 및 중복 처리가 가능하고, Old Producer Async mode와 Old Producer Bulk mode는 ‘중’ 입니다. 전송이 성공하면 유실 및 중복 처리가 필요 없고, 전송이 실패해도 설정한 재시도 횟수 내에 결국 성공하면 유실 및 중복이 발생하지 않지만, 재시도 횟수를 채우고도 결국 실패하면 유실 및 중복이 발생합니다.


Old Producer New Producer 기준
Sync mode Async mode Bulk mode Async mode
전송 속도 전송 후 결과를 받는 데까지 걸리는 시간을 상대 평가
메시지 순서 보장 상: 항상 보장, 중: 조건부 보장, 하: 보장 불가
유실 및 중복 처리 상: 항상 처리, 중: 조건부 처리, 하: 처리 불가

Old Producer Sync mode는 순서 보장과 유실 및 중복 처리 모두 ‘상’이기 때문에 안정적인 메시지 전송이 가능하지만, 전송 속도가 약 10배가량 느립니다. Old Producer Sync mode의 전송 속도를 높일 수 있는 방법으로 프로그램에서 다중 쓰레드로 Producer API를 다수 선언해 사용하는 방법이 있습니다. 하지만 이 방식은 전송 속도를 ‘상’ 수준까지 올릴 수 있는 반면, 순서 보장은 전혀 되지 않는 ‘하’입니다.

Old Producer Async mode와 New Producer Async mode는 둘 다 Async 전송 방식을 쓰지만, New Producer에서만 유실 및 중복 처리를 할 수 있다는 점에서 차이가 있습니다.

Sync vs. Async, Blocking vs. Non-Blocking IO

Kafka Producer API의 전송 방식 별로 차이가 생기는 이유는 기본적으로 Synchronous와 Asynchronous, Blocking IO와 Non-Blocking IO의 차이 때문인데, 이에 대해서는 지면 관계상 링크로 설명을 대신합니다(Link: http://djkeh.github.io/articles/Boost-application-performance-using-asynchronous-IO-kor/).

지금부터는 Synchronous와 Asynchronous, Blocking IO와 Non-Blocking IO의 관점에서 Old Producer API와 New Producer API의 성능과 작동 방식을 살펴보겠습니다. Kafka 문서와 소스 코드를 보면 기존에 사용하던 Old Producer의 Sync mode와 Bulk mode는 Synchronous blocking IO 방식이고, Async mode는 Asynchronous blocking IO 입니다. New Producer API는 Asynchronous non-blocking IO를 기본으로 합니다.

Old Producer API 는 세 가지 전송 방식 모두 Blocking IO를 사용하고, New Producer API는 Non-Blocking IO를 사용합니다.

전송 방식에 따른 효율성을 파악하기 위해 Synchronous 전송과 Asynchronous 전송을 테스트해 비교했습니다. 테스트는 메시지의 순서 보장은 되지 않지만 비슷한 전송 속도를 내기 위해 Old Sync Producer를 다중 쓰레드로 구성했고, Old와 New Async mode는 단일 쓰레드로 구성해 진행했습니다. 각각 100만 개의 메시지를 보낼 때의 CPU부하(Load)의 평균치와 전송하는 동안 JVM의 평균 메모리 사용량, 전송 완료 시간을 비교했습니다.

테스트 환경은 다음과 같습니다.

• CPU : 2.5 GHz Intel Core i7

• Memory : 16GB 1600 MHz DDR3

• Sync: 40 쓰레드

• 테스트 환경에서 쓰레드의 개수를 40개 이상 늘리면 전송 속도가 점차 느려지기 때문에 가장 빠른 전송 속도를 낼 수 있는 쓰레드 개수인 40개로 설정했습니다.

• Async: 1 쓰레드

아래 그래프는 같은 크기의 메시지 100만 개를 Old Producer의 Sync mode(40 쓰레드)와 Async Mode(1 쓰레드), New Producer Async mode(1 쓰레드)로 전송했을 때 완료까지 걸린 시간을 보여줍니다.

수행 시간 (Lower is Better)

Old SyncProducer
Old AsyncProducer
New AsyncProducer
Try 1 152.777 초 133.133 초 119.059 초
Try 2 151.371 초 134.035 초 111.754 초
Try 3 159.409 초 131.817 초 113.995 초

CPU 부하 (Lower is Better)

Old SyncProducer
Old AsyncProducer
New AsyncProducer
Try 1 52.184 % 46.333 % 34.734 %
Try 2 52.755 % 45.081 % 30.794 %
Try 3 52.351 % 45.253 % 32.431 %

메모리 사용량 (Lower is Better)

Old SyncProducer
Old AsyncProducer
New AsyncProducer
Try 1 332.319 KB 149.991 KB 274.795 KB
Try 2 331.662 KB 149.846 KB 271.183 KB
Try 3 326.200 KB 149.980 KB 276.329 KB

그래프에서 볼 수 있듯이, Async mode가 Sync mode보다 사용하는 쓰레드가 적기 때문에 CPU 부하와 메모리 사용량에서 효율이 좋았고, 비동기로 배치 전송을 하기 때문에 전송 속도 역시 더 빨랐습니다. Sync mode에서는 많은 수의 쓰레드를 할당해 작동해야 하니 CPU와 메모리 사용에 부담이 있고, 쓰레드 간의 문맥 전환(Context switching)과 상태 보존 등의 추가 부담이 있을 것입니다.

또한 Old Async mode와 New Async mode는 Blocking IO와 Non-Blocking IO의 차이로 인해 다른 결과를 보여줍니다. Old Async mode는 전송시 배치로 전송하는 메시지들을 모아 하나의 stream으로 만들고 한 번의 연결로 전송해 결과를 받는 구조입니다. 전송 후 결과를 받기 위해 기다리는 Blocking IO지만, 메시지들을 묶어서 하나의 stream으로 전송하기 때문에 적은 수의 연결을 사용하게 되므로, Blocking임에도 불구하고 좋은 전송 성능을 보여줍니다. 하지만 메시지들을 개별 전송이 아닌 하나의 stream으로 만들면, 개별 메시지 단위로 오류 추적을 할 수 없어 하나의 stream을 전송하는 도중에 실패할 경우 유실과 중복 처리가 어렵습니다. 예를 들어 100 개의 메시지를 배치로 전송 했을 때, 55번째 메시지 전송 과정에 실패해도 100 개 메시지 전송이 실패했다는 Exception만 받을 수 있습니다. 이 상황에서 전송 실패에 대처할 수 있는 방안은 전부 다시 보내거나 (1~54번 까지 메시지 중복 발생), 그냥 포기하는 방법(55~100번 메시지까지 메시지 유실 발생) 밖에 없습니다. 두 방법 모두 유실 또는 중복을 피할 수 없기 때문에, Old Producer API에서는 자원의 소모가 크지만 유실이나 중복을 피할 수 있는 Sync mode를 선호했습니다.

New Producer의 Async mode는 개별 메시지 마다 제공되는 Future를 통해 Callback으로 전송 결과를 받고 Non-Blocking Channel IO를 사용합니다. 덕분에 각 메시지 마다 전송 오류 처리가 가능하고, Non-Blocking의 효과로 인해 전송 속도가 세 가지 방식 중에서 가장 뛰어납니다. 하지만 Callback 메서드를 메모리에 보관하고 있어야 하기 때문에, Old Async mode보다 더 많은 메모리를 사용하는 것을 볼 수 있습니다.

New Producer의 Async mode는 Old Producer의 Sync mode보다 자원을 더욱 효율적으로 사용하고, 전송속도 역시 빠릅니다. 또한 New Producer의 Async mode는 Old Producer의 Async mode보다 메모리 사용량은 늘었지만, 유실과 중복에 대한 대처 수준이 강화됐고 전송 효율 역시 좋습니다.

New Producer API가 보여주는 높은 Throughput과 유실 및 중복 처리의 안정성은 아래의 동작 구조를 통해 좀 더 자세히 이해할 수 있습니다.

Kafka Producer API의 동작구조

Sync Blocking IO를 사용하는 Old Producer API와 Async Non-Blocking IO를 사용하는 New Producer의 동작 구조를 차례로 살펴보겠습니다.

Old Producer API

Synchronous Blocking IO를 사용하기 때문에 아래 그림과 같은 순서로 메시지 전송이 이루어집니다.

위 그림처럼 Synchronous Blocking IO를 사용하면, 전송 순서나 유실 및 중복 처리는 가능하지만 Blocking 방식이기 때문에 전송 속도가 매우 느립니다. 전송 속도와 처리량을 높이기 위해 Blocking IO를 사용하는 전통적인 서버 구조와 마찬가지로, 여러 개의 쓰레드를 띄워서 한 쓰레드에서 Blocking이 되는 동안에도 다른 쓰레드에서 처리가 계속되도록 하는 것이 효율적입니다. 개별 쓰레드는 저마다 메시지를 전송한 후에 응답을 받아서 결과로 되돌려줍니다.

하지만 이 구성에서는 Synchronous Blocking IO의 장점이었던 순서 보장이 불가능합니다. 또한, 다중 쓰레드를 써서 전송하는 만큼 Context Switching 비용이나 CPU 부담도 큽니다. 물론, 각 쓰레드 내에서의 전송 순서는 유지할 수 있으므로, 순서 보장이 필요한 partition마다 쓰레드를 할당하는 방법을 사용하면 다중 쓰레드에서도 메시지의 순서를 보장할 수 있습니다.

New Producer API

Asynchronous Non-Blocking IO 전송을 구현하기 위해 Future, Selector, Non-blocking channel API가 사용됐습니다. Java Future API는 나중에 결과를 돌려 받을 수 있도록 Callback을 제공합니다. 또한 Selector API는 Multiplex IO를 통해 단일 쓰레드로 Read와 Write를 모두 수행 합니다. Selector에 Non-Blocking mode설정을 하고 java.nio.channels의 Non-Blocking channel을 사용하도록 구현해 Non-Blocking IO로 전송합니다.

Future API는 http://javarevisited.blogspot.com/2015/01/how-to-use-future-and-futuretask-in-Java.html 페이지에, Selector API는 http://tutorials.jenkov.com/java-nio/selectors.html 페이지에 상세히 설명돼 있습니다.

New Producer API는 전송 시에 이벤트 기반 단일 쓰레드를 사용하기 때문에 멀티 쓰레드 기반의 전송 방식보다 부담이 적습니다. 하지만 Callback을 무한정 쌓을 수는 없기 때문에 시간 제한을 겁니다. 이런 시간 제한 때문에 메시지 중복 전송 문제가 발생할 수 있습니다. 시간 초과(Timeout)는 Broker로 메시지가 전송되지 않았거나 전송은 성공했지만 ack을 받지 못했을 때 발생합니다. 시간 초과가 발생하면 전송 실패로 간주하고 재전송을 하는데, 전송은 성공했지만 ack을 받지 못한 경우에서 메시지 중복 전송 문제가 발생합니다.

New Producer vs Old Producer

동작 구조 속에서 장점과 단점을 이해할 수 있습니다.

먼저 Old Producer의 Sync Blocking IO는 Blocking이 발생하는 동안 전송이 중단되는 것을 최소화 하기 위해 여러 개의 쓰레드를 띄워서 전송합니다. (시스템 가용 자원에 따라 쓰레드의 수를 정하는 것이 좋습니다.) 각 쓰레드는 전송할 메시지를 Kafka Broker로 전송한 다음 받은 응답으로 메시지 전송의 성패를 판단하고 재전송 여부를 결정합니다. 이 방식에는 다음과 같은 지연 유발 요소가 있습니다.

1. 메시지를 하나씩 전처리 하는 작업

2. 전송 후 결과 값을 받을 때까지 일어나는 Blocking

3. 다중 쓰레드 사용에 따른 Context Switching 부담

이 중에서 메시지를 하나씩 전처리하는 작업에 따르는 지연은 Old Producer의 Async mode나 여러 개의 메시지를 리스트(List)로 묶어서 처리하는 Bulk mode를 사용해 해결할 수 있습니다. 하지만 이는 메시지 유실 또는 중복이 발생할 여지가 있기 때문에 유실과 중복에 대한 처리보다 전송 속도가 더 중요한 경우에 사용하는 것이 좋습니다.

다음으로 New Producer API는 두 개의 쓰레드를 사용합니다. 전송할 메시지를 Batch Queue에 넣고 즉시 callback을 반환하는 쓰레드와 batch queue의 메시지를 NIO Send로 전송하는 쓰레드로 구성돼 있습니다. 각 메시지마다 Callback을 등록하고, 복수의 채널을 등록해 Non-Blocking Socket IO로 메시지 전송을 수행합니다. Async Non-Blocking IO는 Sync Blocking에서 발생하던 지연 문제를 대부분 해결합니다. 하나의 쓰레드가 배치 단위로 메시지를 처리하고 전송하기 때문에 메시지를 하나씩 처리하던 문제도 해결됩니다. 또한 Non-Blocking channel을 사용하므로 응답을 받을 때까지 blocking할 필요도 없습니다. 적은 수의 쓰레드를 사용하기 때문에 Context switching 비용도 감소합니다. 각 메시지당 Callback이 등록되어 있기 때문에 Sync mode의 장점인 전송 실패에 대한 처리도 가능합니다.

New Producer의 Async mode는 Sync mode와 비교했을 때 많은 장점을 갖고 있지만 여전히 해결되지 않은 문제도 있습니다. 바로 메시지 유실과 중복에 대한 처리입니다. 아래 절에서 살펴보겠습니다.

Kafka Producer API를 활용한 유실과 중복 없는 메시지 처리

Kafka Producer API는 더 빠른 전송 속도와 더불어 유실과 중복을 더욱 잘 처리할 수 있도록 발전해 왔습니다. 먼저 유실과 중복이 발생하는 상황을 정리해 보고, Kafka Producer API를 사용해 유실과 중복을 좀 더 효율적으로 처리하는 방법을 소개하겠습니다.

메시지 유실 처리

메시지 유실은 보통 전송이 실패하는 상황에서 발생합니다.

1. Client 에서 Kafka Broker로 전송 중인 패킷 유실 -> Client는 ack을 받지 못하고 timeout

2. Kafka Broker를 찾는데 실패->  Exception 발생

3. 전송한 메시지를 Broker에서 block하는 경우 -> Broker에 적재되지 않고 NetworkException 발생

유실 처리에 대해 먼저 살펴보면, Kafka에서 메시지 전송을 실패했을 때 처리할 수 있는 방법은 두 가지가 있습니다.

1. retries 옵션에 적당한 값을 설정해 실패 시 그만큼 재시도

2. 전송 실패 시 발생하는 Exception을 catch해 Queue에 다시 넣음(전송 순서 보장되지 않음)

첫 번째 방법인 Kafka의 retries 옵션 값을 설정하는 방식은 API를 호출한 부분에서 다시 큐에 넣는 작업을 할 필요가 없다는 장점이 있지만, 재시도한 후에도 전송에 실패할 경우 메시지가 유실되는 문제가 있습니다. 또한 retries 횟수를 무작정 크게 주면 수신이 불가능한 특정 장애 브로커로 끊임없이 전송을 시도하면서 엄청난 지연이 발생하는 참사가 일어날 수 있습니다.

두 번째 방법은 Exception을 잡아서 전송 실패한 메시지를 버리지 않고 큐에 직접 다시 넣어주는 방식입니다. 해당 브로커에 문제가 있을 때, 또는 네트워크 상황이 좋지 않아서 전송이 안 될 때, 다른 메시지를 먼저 전송하고 나중에 다시 꺼내서 전송 가능한 브로커로 전송을 시도합니다.

Async mode에서는 전송을 실패했을 때, 첫 번째 방법이나 두 번째 방법 모두 전송 순서가 유지되지 않기 때문에 DI팀에서는 retries옵션을 0으로주고 해당 Exception을 잡아서 다시 큐에 넣는 방식을 사용하고 있습니다. (Kafka에서 retries 옵션의 기본값도 0입니다.)

Sync Blocking IO에서 유실 처리

전송에 실패해 Exception이 발생하면 Kafka Producer API를 사용해 전송을 하는 send() method는 false를 반환합니다.

try {
    ...
    KeyedMessage<String, String> keyedMessage = new KeyedMessage<String, String>(serviceId, partitionKey, message);
	kafkaProducer.send(keyedMessage);
    ...
} catch (FailedToSendMessageException e) {
    errorLog.sendError("Kafka Send Error.", e);
    try {
        kafkaProducer.close();
    } catch (Exception e1) {
        errorLog.error("Closing kafka producer failed.", e1);
    }
    kafkaProducer = new Producer<String, String>(producerConfig);
    return false;
} catch (ProducerClosedException e) {
    errorLog.sendError("Kafka Send Error. enqueue again", e);
    try {
        kafkaProducer.close();
    } catch (Exception e1) {
        errorLog.error("Closing kafka producer failed.", e1);
    }
    kafkaProducer = new Producer<String, String>(producerConfig);
    return false;
}

send()결과가 false이면 전송 실패한 메시지를 다시 큐에 넣습니다.

public void run() {
    String message;
    count = 0;
    isRunning = true;
    while (isRunning && count++ < getMaxCount() && null != (message = queue.deQueue())) {
        if (!send(message)) {
            isRunning = false;
            queue.enQueue(message);
        }
    }
}

Async Non-Blocking IO에서 유실 처리

Callback으로 전송 결과를 받기 때문에 구현이 더 간단합니다. send()에 성공하면 metadata에는 offset, topic, partition이, exception에는 null이 담겨서 callback됩니다. send()에 실패하면 metadata에는 null이 exception에는 발생한 예외 객체가 담겨서 callback됩니다. Exception이 null이 아닐 경우, 즉 send()에 실패한 경우 큐에 다시 넣는 작업을 수행합니다.

kafkaProducer.send(new ProducerRecord<String, String>(serviceId, message), new Callback() {
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        if (exception != null) {
            queue.enQueue(message);
            errorLog.sendError("Send Error in Callback", exception);
        }
    }
});

메시지 중복 처리

메시지 중복은 두 가지 상황에서 발생합니다.

1. 전송에 성공했지만 도중에 ack이 유실된 경우: Client는 전송 실패로 간주하고 재전송 합니다.

2. 전송 성공 후 반환 받는 시간이 지연돼 시간 초과(timeout)가 발생한 경우: Client는 역시 전송 실패로 간주하고 재전송 합니다.

두 경우 모두 실제로 Kafka Broker에 적재가 됐지만, 실패로 판단하고 재전송하기 때문에 메시지의 중복이 발생합니다.

1번의 경우, TCP/IP Layer에서 처리돼야 하는 부분이기 때문에 별도의 처리가 불가능합니다.

2번의 경우, 시간 초과(Timeout) 설정 값을 환경에 따라 적절히 설정해야 합니다. New Producer는 기본값으로 30000ms(=30초)가 설정돼 있고, Old Producer는 10000ms(10초)로 설정돼 있습니다. (Link http://kafka.apache.org/090/documentation.html#producerconfigs 의 request.timeout.ms)

Timeout 값을 너무 크게 잡으면, Sync mode에서는 대기하는 시간이 길어질 수 있고, Async mode에서는 Callback을 반환하지 못해 메모리에 남아있는 문제가 발생합니다.

Kafka New Producer 사용법

위에서 설명한 내용을 종합하면 다음과 같습니다.

1. Blocking이 없으면 빠르다.

2. 여러 건의 메시지를 하나로 묶어서 전송하면 빠르다.

3. 메시지의 유실과 중복을 없애거나 최소화해야 한다.

Kafka Old Producer로도 1번에 소구하는 Async Blocking 전송과 2번에 소구하는 bulk 전송이 가능했음에도, Sync Blocking IO를 사용할 수 밖에 없었던 이유는 3번에 해당하는 메시지 유실과 중복에 대한 처리가 불가능 했기 때문입니다. Bulk 전송 시 10건의 메시지를 묶어서 전송했을 때 1건의 전송 실패가 발생하면 재시도(retries) 옵션으로 재전송이 가능했지만, 몇 번째 메시지에서 전송이 실패했는지 알 수 없었기 때문에 실패한 메시지만 골라서 다시 큐에 넣을 수 없었습니다.

New Producer에서 Async Non-Blocking IO와 동시에 Callback을 지원하면서 위 3가지 지연 요소들이 모두 해결됐습니다. 그 결과 유실과 중복을 최소화하면서 높은 처리량(Throughput)을 기대할 수 있게 됐습니다. Kafka New Producer를 사용하는 방법을 소스 코드와 함께 간단히 살펴보며 글을 마치겠습니다.

1. 사용하는 빌드 툴에서 dependency를 추가합니다.

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.9.0.1</version>
</dependency>

2. Kafka Producer에 Property를 적용해 생성자를 호출합니다. 필수 설정 값들은 반드시 넣어야 합니다. (Producer API Config Link http://kafka.apache.org/090/documentation.html#producerconfigs)

Properties producerProperties = new Properties();
producerProperties.setProperty("propertyKey","propertyValue");
Producer<String, String> kafkaProducer = new KafkaProducer<String, String>(producerProperties);

3. ProducerRecord를 정의 합니다.

  • ProducerRecord Class에 대해서는 https://kafka.apache.org/082/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html 에서 자세한 설명을 얻을 수 있습니다.
  • ProducerRecord는 topic, partition, key, value의 4 가지 인자를 받습니다.
    • topic
      • 필수 인자
      • 전송할 메시지의 topic 명칭
    • partition
      • 선택 인자
      • 실제 Broker의 어느 파티션으로 보낼지 설정
        • Broker에 할당돼 있는 실제 파티션 번호로 설정해야 합니다.
        • A라는 토픽에 파티션이 0, 1로 두 개가 할당돼 있으면 0 또는 1중 선택해서 전송할 수 있습니다.
        • 유효하지 않은 파티션 번호일 경우 NullPointException이 발생합니다.
        • 참고로, Client API에서 partitionFor()를 이용해 해당 토픽에 어떤 파티션 번호가 할당돼 있는지 확인할 수 있습니다.
      • 생략할 경우, 아래에 설명할 인자인 key로 파티션 결정
        • partition과 key를 모두 생략하면 Round Robin 방식으로 파티션을 선정해 전송합니다.
        • RoundRobin으로 적절히 분배가 되는지 확인하기 위해 4개의 파티션으로 실험한 결과 비교적 균등한 분배 결과를 보였습니다.
        • partition 1 partition 2 partition 3 partition 4
          Try 1 94,798 94,685 94,688 94,719
          Try 2 94,724 94,685 94,726 94,685
    • key
      • 선택 인자
      • partition을 지정하지 않은 경우, 이 인자를 이용해 전송할 파티션을 결정
      • partition과 key가 모두 없는 경우, Round Robin 방식으로 파티션을 선정
    • value
      • 필수 인자
      • 해당 토픽으로 전송할 메시지

4. send()를 호출하면서 Callback을 등록할 수 있습니다. Callback에서는 메시지 전송 성공 시 metadata를, 실패시 exception을 반환 받아서 처리할 수 있습니다.

kafkaProducer.send(new ProducerRecord<String, String>(serviceId, message), new Callback() {
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        if (exception != null) {
            queue.enQueue(message);
            errorLog.sendError("Send Error in Callback", exception);
        }
    }
});

결론

Sync mode와 Async mode의 장단점은 아래 표와 같습니다.

Synchronous mode Asynchronous mode
장점 메시지의 순서 보장예외 상황 즉시 처리 가능 적은 자원(CPU, Memory 등)를 사용해 고성능의 전송
단점 전송 속도를 향상시키려면 CPU, Memory 등의 자원을 많이 소모 전송 실패 시 메시지 전송 순서 보장 불가메시지 유실 및 중복 가능성

Sync mode는 앞의 전송 결과를 받고 나서 다음 전송을 처리하기 때문에 동일 쓰레드에 한해서 전송되는 메시지의 순서가 보장되고, 예외 상황을 처리해야 할 때 발생 즉시 처리가 가능하기 때문에 메시지 유실 및 중복을 처리할 수 있다는 장점이 있습니다. 하지만 전송 후 결과를 받기 위해 Block 상태를 유지해야 하므로 다수의 쓰레드를 띄워 전송 처리량(throughput)을 끌어올려야 합니다. 이로 인해 많은 시스템 자원을 소모하는 단점이 있습니다.

Async mode는 단일 쓰레드 이벤트 기반 전송을 하기 때문에, Sync mode에 비해 적은 자원을 사용해 고성능의 전송 속도를 얻을 수 있는 장점이 있습니다. 그러나 전송 완료를 확인할 때까지 기다리지 않기 때문에, 중간에 전송이 실패한 메시지에 대해서는 전송 순서를 보장할 수 없고 메시지 유실에 대처하기 힘들다는 단점이 있습니다.

Old Producer의 Sync mode는 다른 전송 방식들에 비해 전송 속도가 10배 가량 느리다는 문제가 있고, 전송 속도를 끌어올리기 위해서는 다중 쓰레드를 써야하기 때문에 다른 자원들의 소모량이 증가한다는 문제가 있습니다. 무엇보다 다중 쓰레드로 구성할 때, Sync mode의 장점인 순서 보장을 포기해야 한다는 문제도 있습니다.

Old Producer의 Async mode는 Sync mode의 전송 속도와 많은 자원 사용량 등의 단점들을 극복할 수 있는 전송 방법이지만,  전송 실패 시 메시지 전송 순서 보장이 불가한 점과 메시지 유실 및 중복을 처리할 수 없다는 또 다른 문제점이 있습니다.

New Producer API는 Old Producer Sync mode의 느린 전송 속도 및 자원 사용 문제와 Async mode의 유실 및 중복 처리가 불가능 했던 문제를 해결할 수 있습니다. New Producer API는 전송 실패시 순서 보장이 되지 않는다는 문제는 여전히 남아있지만 Old Producer 대비 장점들이 많아 더 각광받고 있습니다. New Producer의 장단점을 아래 표로 다시 정리했습니다.

New Producer API
장점 빠른 전송 속도, 유실 및 중복처리 가능, 적은 자원(CPU, Memory 등)를 사용해 고성능의 전송 가능
단점 전송 실패 시 메시지 순서 보장 불가

New Producer API는 Asynchronous mode의 단점 중 메시지 유실 및 중복 처리를 Callback을 이용해 해결한 좀 더 발전된 Producer API이므로, 아직 사용하고 있지 않다면 사용을 검토해 보는 것도 좋습니다.

물론, Kafka Old Producer API에서 Kafka New Producer API로 API를 업그레이드 할 때는 Property 값들이 많이 바뀌었으므로 많은 검토와 실험이 필요합니다. 또한, 다중 쓰레드를 사용하는 Blocking 위주의 프로그래밍 모델을 단일 쓰레드의 이벤트 기반 프로그래밍 모델로 변경하는 부담도 감안해야 합니다.

하지만 Kafka New Producer API는 Kafka Old Producer API의 Sync IO 방식에서의 메시지 유실과 중복 방지 수준을 유지하면서도 적은 자원 사용과 고성능의 전송 속도를 얻을 수 있습니다. 따라서, 데이터 전송량이 많은 서비스라면, 업데이트를 통해 New Producer API를 활용하면 변경에 드는 수고 이상의 이득을 기대할 수 있습니다.

이상으로 Kafka New Producer API로 업그레이드를 하며 살펴봤던 내용들을 정리해 봤습니다.

긴 글 읽어주셔서 감사합니다!!

강병수 Data Infrastructure팀

SK플래닛 Data Infrastructure팀에서 Apache Kafka Cluster 및 실시간 데이터를 입수하여 Hadoop Ecosystem에 적재하는 역할을 담당하고 있습니다.
실시간 streaming data를 활용하여 서비스의 가치를 높이는 것에 관심이 많습니다.

공유하기