Spark Streaming으로 유실 없는 스트림 처리 인프라 구축하기

안녕하세요. Data Infrastructure팀에서 Data Infra/Application 개발을 하고 있는 Data Programmer 엄태욱 입니다.

이번 글을 통해 대용량 데이터의 실시간 분산 처리를 위한 유실 없는 스트림 처리 인프라를 Spark Streaming으로 구축한 경험을 공유해 드리겠습니다.

실시간 처리(Real-time processing)와 스트림 처리(Stream processing)

스마트폰이 널리 퍼지고 IoT(Internet of Things) 세상이 다가오면서, Big Data의 3Vs(Volume, Variety, Velocity) 중에서 Volume(크기)도 중요하지만 스마트폰의 즉각성만큼 데이터 처리도 빠른 속도를 요구하게 되어, 이제는 Velocity(속도)가 많이 부각되고 있습니다. 이에 따라 대량의 데이터를 빠르게 처리하기 위한 기술들이 실시간 처리(Real-time processing) 또는 스트림 처리(Stream processing)로 불리며 주목 받고 있습니다.

보통 실시간 처리와 스트림 처리라는 용어를 명확하게 구분하지 않고, 실시간 스트림 처리처럼 모호하게 사용합니다. 이 자리를 빌어 조금 더 명확하게 구분을 하면, 실시간 처리는 데이터 처리의 목표 또는 제약 사항이라고 볼 수 있고, 스트림 처리는 데이터 처리 방식이라고 볼 수 있습니다.

실시간이라는 용어에는 마감시각(deadline)이 있고, 마감시각 내에 주어진 연산을 완료하지 못하면 실패로 처리합니다. 조금 더 세분화하면 마감시각을 놓쳤을 때의 처리 결과에 따라 Hard/Firm/Soft real-time으로 구분합니다. (참고 https://en.wikipedia.org/wiki/Real-time_computing) 따라서 실시간 처리는 목표로 하는 시간 제약이 주어지고 그에 따른 실패 수준이 정해집니다.

스트림 처리는 범위가 한정되지 않고(unbounded) 끊임 없이 흘러가는(stream) 데이터에 대한 처리 방식입니다. 반대로 한정된(bounded) 데이터의 처리를 배치 처리(batch processing)라고 합니다. 작은 배치 처리를 무한히 하는 방식도 스트림 처리에 포함되고, 이를 마이크로 배치(micro-batch)라고 합니다. 잘 설계된 스트림 처리는 배치 처리를 포함한다고 까지 말할 수 있습니다. 스트림 처리에 대한 기초적인 개념과 제약 사항은 http://radar.oreilly.com/2015/08/the-world-beyond-batch-streaming-101.html 에서 쉽고 자세하게 설명하고 있으니 참고하시면 좋습니다.

결국, 스트림 처리가 끊임없이 흘러가는 데이터를 처리하다 보니 당연하게도 배치 처리에 비해 데이터 처리 결과를 빠르게 받아볼 수 있어서 실시간 처리라고 불릴 수도 있습니다. 하지만 이번 글은 시간 제약 보다는 유실 없는 데이터 처리에 초점을 두기 때문에 스트림 처리라고 부르겠습니다. 그리고 이번 글에서 “실시간”이라는 표현은 “가능한 빠르게”라는 의미로 사용하였습니다.

스트림 처리 인프라(Stream Processing Infra)의 아키텍처(Architecture)

Data Infrastructure팀에서 구축한 스트림 처리 인프라의 전체 아키텍처를 먼저 보여드리면 다음 그림과 같습니다.

스트림 처리 인프라 아키텍처그림 1. 스트림 처리 인프라 아키텍처

  • 데이터 수집
    • Server: 서버에서의 로그 수집을 위해 두 가지 방식을 제공합니다.
      • LogAgent(자체 개발): 서버에서 로그 파일을 남기면, LogAgent가 로그 변화를 감지하여 실시간으로 Kafka에 전송합니다.
      • RakeAPI(자체 개발): 서버 어플리케이션에서 직접 RakeAPI를 log4j 라이브러리처럼 사용해 로그를 남기면, sync/async 하게 실시간으로 Kafka에 전송합니다.
    • Client App
      • Android, iOS, JavaScript에서 로그를 저장할 수 있는 Rake Library(자체 개발)를 App에 제공합니다.
      • Rake Library는 로그를 RakeServer(자체 개발)로, RakeServer는 다시 Kafka로 실시간 전송합니다.
    • Kafka
      • 대용량 데이터의 분산 메시징 시스템으로 실시간 데이터 전송에서 데이터 원천과 데이터 처리 사이에서 버퍼(Buffer) 역할을 합니다.
      • 이 글에서 소개하는 스트림 처리와 별도로 Collector(자체 개발)를 이용해 HDFS에도 로그가 실시간으로 적재되어 배치 작업에서 사용합니다.
  • 데이터 처리
    • Spark Streaming
      • Kafka에 실시간으로 저장되는 로그를 마이크로 배치 방식으로 가져와 비즈니스 로직에 맞는 데이터 처리를 합니다. 아래에서 조금 더 상세히 설명합니다.
  • 데이터 저장
    • Apache Phoenix
      • NoSQL인 HBase를 마치 RDB처럼 SQL을 통해 사용할 수 있도록 JDBC Driver를 제공해주는 라이브러리 입니다.
      • 복잡한 질의(Query)가 지원되지 않는 경우도 있지만, 주로 Index를 사용하는 특정 key의 row를 빨리 찾는 pick-up성 질의 용도로 사용합니다.
  • 데이터 질의
    • QueryCache(자체 개발): 외부에서 Data Infrastructure팀 Cluster 내의 데이터를 접근할 때, Hive/Impala/Phoenix 등 각종 데이터 질의/검색 엔진을 일원화된 JDBC Driver로 접속할 수 있도록 해 줍니다.
      • Authentication, Authorization, Audit 기능을 구현하여 접근 제어 역할을 합니다. 이를 통해 개인정보보호법, 정보통신망법, 전자금융법 등의 데이터 Compliance를 이슈를 해결합니다.
      • 이름과 달리 아직 Caching 기능을 제공하진 않습니다. 개발 중입니다.

Spark Streaming의 선택
Spark Streaming을 최종 스트림 처리 프레임웍으로 결정하기에 앞서, 올해 초부터 Apache StormSamza도 후보로 검토했습니다. 당시에 많이 사용되던 CEP(Complex Event Processing) 엔진인 Esper는 분산 처리를 지원하지 않아 후보에서 제외하였습니다.

먼저 Apache Storm과 Spark Streaming의 비교는 http://www.slideshare.net/ptgoetz/apache-storm-vs-spark-streaming 자료를 참고했습니다.

Storm과 Spark Streaming 비교그림 2. Storm과 Spark Streaming 비교
출처: http://www.slideshare.net/ptgoetz/apache-storm-vs-spark-streaming

참고로 스트림 처리를 신뢰도(reliability)에 따라 다음과 같이 세 가지 보장 방식으로 구분할 수 있습니다.

  • At-most-once(최대 한 번): 데이터 유실이 있을 수 있어, 추천하지 않는 방식
  • At-least-once(적어도 한 번): 데이터 유실은 없으나 재전송으로 인해 중복이 생길 수 있음. 대부분의 경우 충분한 방식
  • Exactly-once(딱 한 번): 데이터가 오직 한 번만 처리되어 유실도 중복도 없음. 모든 상황에 대해 완벽히 보장하기 어렵지만 가장 바라는 방식

Core Storm과 Spark Streaming을 비교하면 Spark Streaming이 exactly-once를 보장하기 때문에 조금 더 믿을만하지만, 마이크로 배치처럼 동작하도록 추상화 계층을 추가한 Storm Trident에 비해서 Spark Streaming은 아직 부족한 점이 많았습니다. 사실 Apache Storm이 스트림 처리의 유일한 선택이었던 시기였기 때문에 초기 버전의 Spark Streaming은 크게 주목 받지 못했습니다.

그리고 자료 뒷 부분을 보면 Apache Storm과 Spark Streaming의 비교 외에도, 유실 없는 데이터 처리를 위해서는 데이터 원천의 내구성(durability)이 중요한데, Kafka는 데이터 원천과 데이터 처리 사이에서 버퍼 역할을 해주어 스트림 처리에서 Kafka는 필수 불가결한 요소라는 사실을 알 수 있습니다.

이후 Kafka를 만들었던 LinkedIn에서 Samza를 공개하고, Apache 프로젝트가 되었습니다. 이어서 세 가지 프레임웍을 동시에 비교한 https://tsicilian.wordpress.com/2015/02/16/streaming-big-data-storm-spark-and-samza/자료도 나왔습니다.

Storm과 Spark Streaming과 Samza 비교
그림 3. Storm과 Spark Streaming과 Samza 비교
출처: https://tsicilian.wordpress.com/2015/02/16/streaming-big-data-storm-spark-and-samza/

Samza는 Kafka를 만든 팀에서 개발했기 때문에 Kafka와의 조합이 다른 어떤 프레임웍 보다 좋았습니다. 지금은 YARN이 널리 사용되지만 당시에는 아직 MapReduce도 v1.0을 주로 사용하던 시기였기 때문에, YARN에서만 동작한다는 점은 단점이 되었습니다. 또한 exactly-once를 지원하지 않았고, 아직은 프로젝트의 성숙도가 낮아서 스트림 처리의 기초적인 기능만 구현되어 있었습니다.

결국, 다음과 같은 기준으로 Spark Streaming을 최종 선택하였습니다.

  • 배치 작업의 Spark과 코드 공유가 가능해서 Spark에서 배치 처리를 통해 더 간편하게 코드를 검증하고, 이후에 큰 부담 없이 Spark Streaming에 적용할 수 있습니다.
    • 팀에서 Hive 이외에 개발이 필요한 배치 작업을 MapReduce 기반의 Cascading이나 Scalding을 사용하다 Spark으로 이전하고 있었기 때문에 Spark Streaming이 가장 유리했습니다.
  • Spark은 Spark Shell을 통해 한 줄씩 실행해 보며 결과를 확인할 수 있는 REPL(Read–Eval–Print Loop) 환경을 제공하기 때문에 손쉬운 개발 및 테스트가 가능합니다.
    • 일반적으로 데이터 프로그래밍 개발 환경은 로컬 개발 환경과 데이터가 있는 테스트 및 실행 환경이 분리되어 있습니다.
  • Spark Standalone, Mesos, YARN처럼 다양한 클러스터 환경에서 동작합니다.
    • 팀에서 초기에 Standalone과 Mesos를 사용해 보았지만 Tez와 같은 다른 프레임웍과 함께 사용하기 위해 YARN을 기본으로 사용하고 있습니다.
  • 유실 없는 데이터 처리가 가능한 exactly-once를 지원합니다.
    • 다만, 특정 상황에서 exactly-once가 아닌 at-least-once인 경우도 있습니다.
  • 마이크로 배치 방식의 한계로 인해 latency가 1초 이내인 다른 프레임웍에 비해 떨어지지만, 수초 내의 지연은 감수할 생각이었습니다.
  • 개발이 정체된 Apache Storm이나 아직은 성숙이 더 필요한 Samza에 비해 많은 개발자들이 참여해서 빠르게 발전하고 있습니다.
  • 개인적으로 앞으로 분산 데이터 처리는 Scala 언어가 대세가 될 것으로 보고 있습니다. (이전 발표 자료 참고)

물론 Apache Storm이나 Samza를 이용해 실제 개발을 진행해 보지 않았기 때문에, 최종 선택 결과가 무조건 옳다고 볼 수 없습니다. 각자 필요로 하는 기능에 맞는 프레임웍을 선택해서 구현하면 된다고 생각합니다.

최근에는 Spark Streaming과 반대로 Kappa Architecture(http://radar.oreilly.com/2014/07/questioning-the-lambda-architecture.html)처럼 스트림 처리가 중심이고 배치 처리로 확장 중인 Apache Flink가 빠른 속도로 발전하고 있습니다. 스트림 처리를 전면에 내세운 만큼 Spark Streaming에 비해 Event Time Processing, Flexible Window 등 스트림 처리의 더 나은 기능들을 먼저 지원하고 있습니다. 발전 속도나 가능성을 봤을 때, 기대되는 프로젝트로 기회가 되면 추후 업무에 적용해 보려고 합니다.

또한, Kafka도 Kafka Streams(https://issues.apache.org/jira/browse/KAFKA-2590, https://speakerdeck.com/nehanarkhede/demystifying-stream-processing-with-apache-kafka)라는 기능으로 자체 스트림 처리를 지원할 예정입니다. 이 프로젝트 또한 조금 더 지켜보면서 유용하게 사용할만한 곳을 찾아보고 있습니다.

Apache Phoenix의 선택

많은 Spark Streaming 적용 사례들을 보면 대부분 HBaseCassandra를 스트림 데이터 처리 결과의 저장소로 사용합니다. 그 이유는 크게 두 가지로 볼 수 있습니다.

  • 스트림 처리 결과를 빠르게 저장하고, 다른 시스템에서 그 결과를 빠르게 조회하려고
  • 저장소에서 상태(state)를 관리하고 스트림 처리에서는 상태에 무관하도록(Stateless) 하여, 스트림 처리의 연산을 더 간단하고 빠르게 하려고

HBase나 Cassandra처럼 결과를 빠르게 저장하고 조회할 수 있는 대용량 저장소들은 Key-Value 형태로 데이터를 저장하는 NoSQL DB입니다. 특정 key나 key의 범위를 빠르게 찾을 수 있는 장점이 있지만, 조건에 맞는 여러 row들을 찾거나 여러 row들에 대해 연산한 결과를 질의할 경우에는 전체 조회(full scan)를 하게 되어 상대적으로 느린 단점이 있습니다.

Data Infrastructure팀에서는 결과적 정합성(eventual consistency)을 보장하는 Cassandra 보다는 즉각적으로 정합성을 보장하는 HBase 쪽을 선호했습니다. 그리고 위에서 보여드린 스트림 처리 인프라의 아키텍처에서 알 수 있듯이, 현재는 QueryCache에서 JDBC Driver를 이용한 SQL로만 결과를 조회할 수 있기 때문에 HBase 위에 Apache Phoenix를 얹어서 사용하기로 하였습니다. SQL로 데이터를 조회하면 응답 속도가 느리더라도 사용자가 질의 조건을 손쉽게 바꿔가며 다양하게 조회할 수 있는 장점이 있습니다.

Apache Phoenix를 사용하면 RDB처럼 SQL을 사용할 수 있지만, HBase의 속성처럼 주로 특정 row key를 조회하는 pick-up성 질의 용도로 많이 사용합니다. 스트림 처리에서는 데이터 처리 과정에서 대부분의 연산을 마치고, 그 결과만 특정 row key 기준으로 저장합니다.

단순히 HBase를 SQL로 접근할 수 있기 때문에 Apache Phoenix를 선택하진 않았습니다. Apache Phoenix를 사용함으로써 얻는 추가적인 이득은 다음과 같습니다.

  • Row key를 여러 컬럼의 복합키로 설정해야 할 경우, 따로 row key를 계산해서 만들지 않고, 여러 column들을 조합해서 하나의 row key 처럼 사용할 수 있습니다.
  • Row key 이외에 Secondary Index를 사용할 수 있어서, row key가 아닌 다른 조건으로 조회할 경우에도 빠르게 결과를 얻을 수 있습니다.
  • 비교적 쉽게 UDF(User-defined Function)를 만들어 사용할 수 있고, 다양한 built-in 함수들을 제공합니다.
  • View 기능을 제공해서 배치 처리시 이전 테이블 내용을 새로운 테이블 내용으로 손쉽게 스위칭할 수 있습니다.

Spark Streaming의 발전

스트림 처리 프레임웍으로 선택한 Spark Streaming을 간단히 정리하면, 다음 그림과 같습니다.

Spark Streaming 흐름

그림 4. Spark Streaming 흐름
출처: http://spark.apache.org/docs/latest/streaming-programming-guide.html

끊임없이 들어오는 스트림 데이터를 Spark Streaming이 배치 간격(batch interval)마다 데이터를 나누고, 나눠진 배치 데이터를 Spark Core 엔진이 처리해서 배치 간격마다 결과를 내놓습니다. 즉, 스트림 처리를 작은 시간 간격을 갖는 배치 처리의 연속으로 전환하여 처리합니다. 이러한 방식을 마이크로 배치(micro-batch) 방식이라고 합니다. 또한 각각의 배치 처리는 Spark으로 분산 처리하기 때문에, 좋은 성능을 가지면서도 장애 복구가 가능한 Spark의 장점들을 모두 이어 받았습니다.

Spark Streaming이 마이크로 배치 방식을 택한 이유는 배치 처리용 Spark이 먼저 개발된 후에 Spark의 다양한 기능을 그대로 이용하면서 스트림 처리가 가능하도록 추가한 기능이 Spark Streaming이기 때문입니다. 초기 Spark Streaming의 설계는 Discretized Streams: Fault-Tolerant Streaming Computation at Scale 논문에서 확인할 수 있습니다.

마이크로 배치 방식은 continuous 처리 방식의 스트림 처리 시스템보다 latency가 큰 단점이 있지만, 배치 처리와 스트림 처리를 하나의 시스템에서 처리할 수 있는 장점이 되었습니다. 기존의 continuous 처리 방식과의 차이점은 https://databricks.com/blog/2015/07/30/diving-into-spark-streamings-execution-model.html 에서 조금 더 살펴볼 수 있습니다.

배치 처리와 스트림 처리를 하나의 시스템에서 코드를 재활용해서 사용할 수 있기 때문에, 최근 대용량 데이터 처리에서 많이 언급되는 람다 아키텍처(Lambda Architecture)에 적합한 방식이라고 할 수 있습니다. 람다 아키텍처에서는 배치 계층(batch layer)과 속도 계층(speed layer)이 따로 존재하는데, Spark Streaming은 람다 아키텍처를 하나의 시스템에서 구현할 수 있어, 한 걸음 더 나아간 방식이라고 할 수 있습니다.

Spark Streaming 프로젝트는 2012년부터 시작되어 Spark v0.7에 알파(alpha) 출시로 선보였고, Spark v0.9에 공식 출시되었습니다. 이후 Spark 버전이 올라감에 따라 Spark Streaming은 다음과 같이 발전하였습니다.

v1.0 우아한 중단(Graceful shutdown) https://issues.apache.org/jira/browse/SPARK-1331

  • Driver로부터 중단 신호를 받으면 Receiver는 더 이상 데이터를 가져오지 않고, Driver는 Receiver가 이미 받아온 데이터가 처리될 때까지 기다리도록 합니다.

v1.0 Receiver 재시작 https://issues.apache.org/jira/browse/SPARK-1340

  • Receiver가 동작중인 executor가 실패했을 때, receiver가 다시 시작되지 않는 문제를 해결하였습니다.

v1.2 데이터 유실 방지를 위한 WAL(Write Ahead Logs) https://databricks.com/blog/2015/01/15/improved-driver-fault-tolerance-and-zero-data-loss-in-spark-streaming.html

  • Recieiver가 데이터 원천으로부터 데이터를 가져올 때 메모리에만 데이터를 저장하고 있기 때문에, 처리되지 못한 채 장애가 발생하거나 중단되면 데이터가 유실되는 문제가 있습니다.
  • 특히 HDFS, S3 같은 데이터 원천은 처리하지 못한 부분을 언제든지 다시 읽어 들일 수 있지만, 다시 읽어들일 수 없는 데이터 원천들(Kafka, Flume, Network 등)은 유실을 피할 수 없습니다.
  • 저널링(journaling)이라고도 불리는 WAL을 도입해 데이터 원천으로부터 데이터를 읽으면 HDFS나 S3 같은 신뢰성 있는 저장소에 먼저 기록해서 장애 발생 시에도 유실되지 않도록 했습니다.
  • 하지만 메모리 뿐만 아니라 WAL을 저장하다 보니 시간당 전체 처리량(throughput)은 떨어질 수 밖에 없습니다.
  • 데이터 원천으로 Kafka를 사용하는 경우 Kafka가 이미 broker에 자체적인 WAL을 가지고 있는데, Spark Streaming이 유실 방지를 위해 한 번 더 WAL을 기록하는 비효율적인 상황이 되었습니다.
  • WAL을 사용함에도 불구하고, Zookeeper에 Kafka의 offset을 저장하기 전에 시스템이 실패할 경우, 복구 처리를 두 번 하게 되는 문제가 있습니다. (At-least-once)

v1.3 Kafka Direct API https://databricks.com/blog/2015/03/30/improvements-to-kafka-integration-of-spark-streaming.html

  • 사실 Spark Streaming의 WAL이 없어도 Kafka는 자체적으로 스트림의 특정 위치로부터 데이터를 재전송할 수 있는 기능이 있습니다. 따라서 Kafka의 이 기능을 제대로 사용한다면 Spark Streaming의 WAL이 오히려 성능 저하를 일으키는 비효율적인 처리 방식입니다.
  • 기존의 Kafka API는 receiver에서 Kafka의 High level Consumer API를 사용하였기 때문에 offset을 세밀하게 관리하지 않고 Zookeeper에 저장했습니다.
  • Kafka Direct API는 receiver를 사용하지 않고 executor에서 직접 Kafka의 Simple Consumer API를 사용하여 offset 범위를 지정한 만큼 데이터를 받아서 처리합니다.
  • 스트림 처리를 위한 KafkaUtils.createDirectStream() 외에 KafkaUtils.createRDD()를 제공해 Kafka 데이터를 배치 처리할 수 있는 기능도 추가되었습니다.
  • Kafka Direct API로 exactly-once를 보장하게 되어 비로소 유실 없는 스트림 처리에 Spark Streaming을 사용할 수 있게 되었습니다.

v1.4 UI 향상 https://databricks.com/blog/2015/07/08/new-visualizations-for-understanding-spark-streaming-applications.html

  • Web UI를 통해 Spark Streaming의 최근 n개(기본 1000개)의 마이크로 배치 처리 결과(Input Rate, Scheduling Delay, Processing Time, Total Delay 등)를 그래프로 확인할 수 있습니다.
  • 특히, 처리 시간(Processing Time)과 일정 지연(Scheduling Delay) 수치 그래프를 통해 Spark Streaming의 현재 상태가 안정적(stable)인지 한 눈에 알 수 있습니다.
  • 참고로 처리 시간이 배치 간격(batch interval) 보다 작아서 일정 지연이 발생하지 않아야 안정적인 상태입니다.

v1.5 역 압력(back pressure) https://databricks.com/blog/2015/09/09/announcing-spark-1-5.html

  • 역 압력(back pressure)은 시스템이 불안정(unstable) 상태일 경우에 데이터를 받아오는 양을 조절해 주는 기능입니다.
  • 갑자기 많은 양의 데이터가 몰려 들어오거나 분산 환경에서 일시적인 처리 지연이 발생할 경우, 시스템이 동적으로 시간당 입력량(Input Rate)을 조절해 다시 안정을 찾도록 합니다.

v1.5 UI에서 입력 메타 데이터 출력 https://issues.apache.org/jira/browse/SPARK-8701

  • Web UI를 통해, 파일 스트림인 경우에는 파일 경로를, Kafka인 경우에는 Topic, Partition, Offset 범위를 보여줘 디버깅과 운영에 도움을 줍니다.

Kafka Direct API

실제 유실 없는 스트림 처리 구현에서 사용한 Kafka Direct API에 대해서 조금 더 알아보기 위해, 먼저 WAL을 사용한 Kafka Receiver 방식의 문제점부터 되짚어 보겠습니다.

WAL을 사용한 receiver 방식은 Kafka로부터 받은 데이터가 WAL에 먼저 저장되는데, Zookeeper에 Kafka의 offset을 갱신하기 전에 시스템이 실패할 경우, Spark Streaming은 데이터를 받았다고 인식하지만 Kafka는 데이터를 전달하지 않았다고 인식하는 불일치가 발생합니다. 이후 시스템이 복구되면 Spark Streaming은 WAL에 저장된 데이터를 읽어 복구 처리를 하고 receiver를 동작시킵니다. Receiver는 Kafka로부터 다음 데이터를 받아오려고 하는데, Kafka는 Zookeeper에 저장된 offset 정보를 보고, 이전 데이터를 전달하지 않았다고 보게 되므로, 보낼 필요가 없는 데이터를 다시 전송합니다. 결국 복구 처리를 두 번하게 됩니다.(At-least-once)

이와 같은 불일치는 Spark Streaming과 Kafka가 데이터 전송을 따로 관리하여 한번에 업데이트 되지 않을 수 있기 때문입니다. 이 문제를 해결하려면 하나의 시스템이 데이터의 전송을 일관성 있게 관리해야 하고, 장애 복구 시에 데이터 재전송 여부에 대한 판단도 확실하게 통제할 수 있어야 합니다.

Kafka Direct API 방식은 데이터 전송에 대한 관리를 오직 Spark Streaming에서만 담당합니다. 배치 간격마다 driver가 Kafka의 leader로부터 이번 배치에 받아올 데이터에 대한 offset 범위를 얻어와 지정하고, 데이터 처리를 할 때 executor는 Kafka의 Simple Consumer API를 사용하여 앞서 driver가 지정한 데이터 범위만 가져와서 처리합니다. 따라서 한 번 시작하면 계속 동작하는 receiver가 불필요하고, offset을 직접 관리하기 때문에 Zookeeper에 대한 의존도 불필요합니다.

Kafka Direct API 방식은 receiver를 사용하지 않기 때문에 다음과 같은 특성이 있습니다.

  • 매 배치마다 새로 할당받은 executor에서 Kafka에 연결해서 데이터를 가져와야해서, receiver를 사용할 때보다 약간의 지연이 더 발생합니다.
    • Receiver를 사용하는 경우, 처음 receiver를 시작할 때 만들어진 Kafka 연결을 계속 사용하기 때문에, 매 배치마다 연결 지연이 발생하지 않습니다.
  • Kafka의 각 파티션은 하나의 executor에서 직접 받아오고, 하나의 RDD 파티션이 생성됩니다. 즉, Kafka의 파티션 수만큼 병렬화하여 데이터를 가져옵니다.
    • Kafka 파티션 수가 적다면 병렬 수준이 낮아서 성능이 낮아질 수 있어, Kafka 파티션 수 조정이 필요할 수 있습니다.

Kafka Direct API 방식은 Zookeeper에 offset 정보를 갱신하지 않기 때문에, Zookeeper 기반의 Kafka 모니터링 도구를 사용할 수 없습니다. 필요에 따라 offset 정보를 Zookeeper에 업데이트 하도록 직접 구현할 수 있습니다.

Kafka Direct API 방식에서 checkpoint를 사용하면 driver는 장애 복구를 위해 어플리케이션의 메타 데이터를 HDFS나 S3처럼 신뢰성 있는 저장소에 저장합니다. 메타 데이터에는 어플리케이션 설정 정보와 스케쥴러에 등록되었지만(queued) 아직 완료되지 않은 배치 작업에 대한 정보를 저장합니다. 배치 작업 정보에는 offset 정보도 포함되어 있어 driver 장애로 중단된 스트림 처리를 복구할 때 저장된 메타 데이터를 읽어 불필요한 재전송 없이 장애가 발생했던 부분부터 처리할 수 있습니다.

Spark Streaming과 Kafka의 조합에 대해서는 http://spark.apache.org/docs/latest/streaming-kafka-integration.html 에서 더 자세히 살펴볼 수 있습니다.

Spark Streaming의 장애 감내(Fault-tolerance)와 스트림 처리 신뢰도

유실 없는 스트림 처리 구현에 앞서 Spark Streaming의 장애 감내에 대해서 먼저 살펴보겠습니다. 먼저, Spark Streaming이 동작하는 클러스터 환경에서 노드 역할에 따라 장애 발생 시 다음과 같은 결과가 발생합니다.

  • 드라이버 노드(Driver node)의 장애
    • 드라이버 노드가 가지고 있는 SparkContext가 사라져 모든 executor와 그 executor의 메모리에 있는 데이터가 사라집니다.
    • 사실상 Spark Streaming 어플리케이션 전체가 중단된다고 볼 수 있습니다.
  • 워커 노드(Worker node)의 장애
    • 워커 노드의 executor가 실패하고, 메모리에 있는 데이터도 사라집니다.
    • Receiver가 실행중인 워커 노드라면 receiver가 버퍼링하고 있는 데이터가 사라집니다.
    • Spark과 마찬가지로 워커 노드에 장애가 발생하면, executor나 receiver를 다른 정상적인 노드로 옮겨서 다시 처리할 수 있도록 합니다.

참고로 Chaos Monkey를 이용해 무자비하게 AWS상의 인스턴스들을 죽여가며 테스트를 진행하는 Netflix에서는 Spark Streaming의 각 컴포넌트에 대한 회복성을 테스트하여 다음과 같이 공개하였습니다. Spark Streaming이 장애에 비교적 안전해서 스트림 처리에 사용하겠다는 의견을 보였습니다.

Netflix의 Chaos Monkey 테스트 결과
그림 5. Netflix의 Chaos Monkey 테스트 결과
출처: http://techblog.netflix.com/2015/03/can-spark-streaming-survive-chaos-monkey.html

Spark Streaming이 장애에 대한 회복성이 좋다는 사실과 별개로 스트림 처리 과정에서 데이터 신뢰성도 보장해야 합니다. 스트림 데이터 처리는 크게 데이터 수신, 데이터 처리, 결과 저장이라는 세 단계로 나눌 수 있습니다. 스트림 처리가 종단간(end-to-end) exactly-once를 보장하려면 세 단계 모두 exactly-once를 보장해야 합니다.

데이터 수신(Receiving): 데이터 원천으로부터 데이터를 받아오는 단계

  • 파일 기반의 데이터 원천은 고민할 필요 없이 exactly-once를 보장합니다.
  • WAL을 사용하지 않는 receiver를 사용하면 드라이버 노드 장애의 경우 at-most-once를 보장하고 워커 노드 장애의 경우 at-least-once를 보장합니다.
  • WAL을 사용하는 receiver를 사용하면(Spark v1.2 이상) 드라이버 노드 장애와 워커 노드 장애 양쪽 모두 at-most-once를 보장합니다.
  • Receiver가 필요 없는 Kafka Direct API를 사용하면 exactly-once를 보장합니다.

데이터 처리(Transforming): 받아온 데이터를 여러 단계에 걸쳐 가공 및 연산하는 단계

  • RDD의 특성으로 받아온 데이터를 그대로 다시 접근할 수 있다면, 장애가 발생해도 데이터 리니지(lineage)에 따라 최종 연산 결과물은 항상 같은 결과를 보장합니다.
  • Spark Streaming에서 장애 감내에 대해 고민할 필요가 없는 단계입니다.

결과 저장(Outputting): 최종 연산 결과 데이터를 파일 시스템, DB, 대시보드 등 외부 시스템에 전달하거나 저장하는 단계

  • 결과 저장 단계도 실패해서 재실행 할 수 있기 때문에 기본적으로 한 번 이상 저장하는 at-least-once만 보장합니다.
  • 결과 저장 방식이 멱등(idempotent)한지 여부와 트랜잭션(transaction)을 지원하는지 여부에 따라 보장 수준이 달라집니다.
  • 멱등하다면 몇 번을 실행해도 한 번 실행했을 때와 같은 결과를 얻기 때문에 exactly-once를 보장한다고 할 수 있습니다. (Idempotent exactly-once 라고 부릅니다.)
  • 외부 시스템이 트랜잭션을 지원한다면 실패시 roll-back이 가능하므로 성공한 결과만 한 번 저장할 수 있어 exactly-once를 보장합니다. (Transactional exactly-once 라고 부릅니다.)

Kafka Direct API를 사용해서 데이터를 받고 Apache Phoenix에 Key-Value 기반으로 멱등하게 저장한다면, Spark Streaming으로 종단간 exactly-once를 보장하면서 스트림 처리가 가능하다는 결론을 얻을 수 있습니다.

Spark Streaming의 장애 감내에 대해서는 http://spark.apache.org/docs/latest/streaming-programming-guide.html#fault-tolerance-semantics 에 조금 더 자세히 나와 있습니다.

Kafka Direct API와 Apache Phoenix를 이용한 스트림 처리 구현

Kafka Direct API를 사용해서 구현하는 방법은 Kafka Direct API를 직접 구현한 Cody Koeninger의 글(https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md)과 Code(https://github.com/koeninger/kafka-exactly-once/)를 참고하였습니다.

앞서 Kafka Direct API에 대해서 설명하였듯이, Kafka Direct API는 Kafka의 offset을 Zookeeper가 아니라 Spark Streaming에서 직접 관리합니다. 이렇게 Kafka offset을 직접 세밀하게 관리하려면 어딘가에 offset 정보를 따로 저장해야 합니다. 그래야 driver에 장애가 발생해서 Spark Streaming을 재시작하더라도 마지막으로 처리된 offset을 읽고 마지막으로 처리된 이후부터 이어서 처리할 수 있습니다.

쉽게 offset을 저장하는 방법은 Spark의 checkpoint에 저장하는 방법입니다.

  • Kafka의 offset 정보를 직접 접근할 필요가 없습니다. Spark Streaming이 알아서 checkpoint의 메타 데이터에 offset 정보를 저장합니다
  • Spark Streaming을 다시 시작하더라도 자동으로 checkpoint에 저장된 offset 정보를 읽어서 이어서 처리할 수 있습니다.
  • Spark Streaming이 알아서 저장하기 때문에 최종 결과를 저장했음에도 불구하고 offset을 checkpoint에 저장하지 못하고 장애가 발생하는 경우가 발생할 수 있습니다.
    • 결과 저장 단계에서 트랜잭션을 지원하면 두 번의 트랜잭션이 발생할 수 있습니다. 따라서 결과 저장이 멱등해야 합니다.
  • checkpoint에 저장된 메타 데이터를 읽을 수 없으면, 재시작을 하더라도 마지막 처리된 이후부터 처리할 수 없습니다.
    • checkpoint 정보는 직렬화(serialized)된 객체를 저장하는데, 역직렬화(deserialize)할 때 클래스가 바뀌면 에러가 발생하며 실패합니다.
    • 어플리케이션이 시스템 장애로 재시작하면 코드 변화가 없기 때문에 역직렬화 문제가 발생하지 않습니다.
    • 하지만 스트림 처리를 재시작하는 경우는 대부분 어플리케이션이 데이터를 잘못 처리해서 죽어버리거나 잠재적인 오류를 수정하여 어플리케이션을 재배포하는 경우이기 때문에, 어플리케이션 코드가 동일한 경우가 별로 없습니다. 따라서 checkpoint를 믿고 사용하기에는 불안정한 요소가 많습니다.
    • window 연산은 checkpoint를 사용할 수 밖에 없는데, 어플리케이션 코드 변경에 취약하다는 점을 알고 사용해야 합니다.

조금 복잡하지만 offset을 직접 저장하고 관리하는 방법이 있습니다.

  • Kafka의 offset 정보를 직접 접근해서 별도로 저장하고, Spark Streaming이 시작할 때마다 별도로 저장된 offset 정보를 읽어서 처리하도록 구현해야 합니다.
  • 직접 구현하였기 때문에 코드가 변경되어도 저장된 offset을 다시 읽지 못하는 문제는 없습니다.
  • 결과 저장 단계와 offset 저장을 동시에 처리하도록 구현할 수 있기 때문에 결과 저장이 트랜잭션한 경우와 멱등한 경우 모두 가능합니다.

실제 데이터 처리 개발과 운영 경험에 따르면, 데이터의 변화나 기능 추가로 인해 어플리케이션 코드가 바뀌는 경우가 종종 발생합니다. 따라서 checkpoint에 offset을 저장하는 방법은 권장하지 않습니다. 조금 복잡하고 번거롭더라도 offset을 직접 저장하고 관리하는 방법이 개발 자유도 측면에서도 권장하는 방법입니다.

RDD로부터 offset 정보를 읽을 때 주의할 점은 다음과 같이 createDirectStream()으로부터 받은 stream에 대해 foreachRDD()를 사용하자마자 바로 rdd를 casting해서 offsetRanges를 구해야 한다는 점입니다.

val stream = KafkaUtils.createDirectStream(...)
...
stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
...
}


앞에서 언급했듯이 RDD 파티션과 Kafka 파티션과는 1:1 대응 관계가 있기 때문에, Kafka에서 읽은 stream의 rdd를 바로 casting 해야 Kafka 파티션 정보에서 offset 정보를 얻을 수 있습니다. rdd의 파티션이 변경될 수 있는 transformation을 사용한 이후에는 셔플(shuffle)이 발생해서 Kafka 파티션 정보를 얻을 수 없습니다. 실제로 casting 이 불가능하다는 에러가 발생합니다. 그리고 이렇게 처음에 얻은 offset 정보를 최종 결과 저장이 끝난 후에 별도로 저장할 수 있도록 유지해야 합니다.

그리고 RDD 파티션과 Kafka 파티션이 1:1 대응이기 때문에 배치 간격 동안 특정 Kafka 파티션에 데이터가 들어오지 않는 경우, 이에 대응하는 RDD 파티션이 생성되지 않습니다. Kafka의 topic에 원래 4개의 파티션이 존재해도 특정 파티션 1개에 데이터가 들어오지 않으면 RDD의 파티션은 3개만 생성됩니다. 이 경우, 위와 같은 방법으로 offsetRanges를 구하면 3개의 파티션에 대한 offset 정보만 얻게 됩니다. 3개의 파티션 정보만 저장하면 이후 offset 정보를 읽어서 Kafka에 데이터를 요청하면 topic 내에 4개의 파티션이 있음에도 불구하고 3개의 파티션으로부터만 데이터를 가져오게 되는 문제가 발생합니다. 따라서 들어오는 데이터 수가 줄어들어 offset 정보의 수가 이전보다 줄어들면, 받아오지 못한 파티션의 offset 정보는 이전 배치의 파티션 offset 정보로 보정해서 저장해야 합니다.

Apache Phoenix에 Key-Value로 멱등하게 저장하는 구현은 row key만 테이블 내에서 유일하게(unique) 설계하면, 장애 복구로 인해 몇 번을 덮어 쓰더라도 문제가 되지 않습니다. 그리고 Apache Phoenix에 저장이 성공한 경우에만 offset을 저장하였습니다. 따라서 offset을 저장하기 전에 장애가 발생하면, 해당 배치는 offset을 저장할 때까지 반복해서 실행됩니다. 하지만 멱등성을 보장하도록 row key를 설계하였기 때문에 불필요한 반복이더라도 유실이 발생하지 않고 최종 결과도 항상 믿을 수 있는 마지막 상태가 됩니다.

Apache Phoenix에 데이터를 저장하는 방법은 Apache Spark Plugin을 이용하는 방법과 JDBC를 이용하는 방법이 있습니다. Spark Plugin은 테이블 단위에서 여러 row의 데이터를 처리하는데 유용하고, row 단위의 데이터 처리에는 JDBC를 이용하는 방법이 편리합니다.

yarn-client 모드 선택

Spark Streaming은 Spark Standalone, Mesos, YARN처럼 다양한 클러스터 환경에서 모두 동작하지만, Data Infrastructure팀에서는 다른 프레임웍과 함께 사용할 목적으로 YARN을 선택했습니다. YARN 환경에서 Spark Streaming은 두 가지 배포 모드를 제공합니다.

  • yarn-cluster 모드: driver가 YARN의 application master에서 동작하여 spark을 실행하면 실행했던 클라이언트와 연결이 끊어지고 클러스터 내에서 동작합니다. 클라이언트와 연결되어 있지 않아 Spark의 실행 상태는 Web UI를 통해서만 확인할 수 있고, spark-shell을 사용할 수 없습니다.
  • yarn-client 모드: driver가 클라이언트의 프로세스에서 동작하고, YARN의 application master는 YARN으로부터 자원 할당 요청만 담당합니다. 클라이언트와 연결되어 있어 spark-shell을 사용할 수 있습니다.

보통 배치 처리의 경우 yarn-clinet 모드로 개발과 테스트를 진행하고, yarn-cluster 모드로 실행하는 경우가 많습니다. 그리고 yarn-cluster 모드로 실행해야 driver에 장애가 발생해도 다른 노드에서 driver를 다시 실행해서 장애 복구할 수 있습니다.

24/7로 끊임없이 동작하는 스트림 처리에서 yarn-cluster 모드로 실행하면 driver와의 연결이 끊겨 있어 로그를 확인하기 어려운 문제가 있습니다. 일부에선 Spark Streaming에서 로그를 적게 남겨야 성능 향상 효과가 있다며 로그를 최소하하라고 권장합니다. 1초 단위 이하의 배치 간격이 필요할 정도로 속도가 중요한 경우에는 로그 출력도 속도에 큰 영향을 끼치지만, 3초 이상의 배치 간격에서는 로그 출력을 줄여서 얻는 효과보다 디버깅이 어려운 역효과가 더 큽니다. Executor의 로그까지 보지 않더라도 최소한 driver의 로그는 디버깅에 상당히 많은 도움을 주므로 yarn-client 모드가 더 유용하다고 판단했습니다.

yarn-client 모드를 사용하는데 driver 노드에 장애가 발생하면 치명적인 상황이 됩니다. 다른 노드에서 다시 driver를 실행할 수 있어야 하기 때문입니다. 저희는 driver 노드가 클러스터의 노드에 포함되어 있지 않고, 사양이 좋은 노드를 사용하기 때문에 driver 노드에서 장애가 발생한 경우는 없었습니다. 그럼에도 불구하고 driver 노드에서 장애가 발생하면 다른 노드에서 빠르게 다시 실행할 수 있도록 백업 환경을 구축했습니다.

yarn-client 모드에서 driver의 로그를 남기려면, 기본 설정인 콘솔 화면 출력 대신 파일로 일정 기간동안 rolling하면서 저장할 수 있도록 설정하는 편이 좋습니다. http://shzhangji.com/blog/2015/05/31/spark-streaming-logging-configuration/를 참고하면 Spark Streaming에서 log4j properties 파일을 수정해 로그 설정을 변경할 수 있습니다.

Spark Streaming 중단과 재시작

Spark Streaming이 분산 시스템의 장애로 driver 프로세스가 죽으면, 다시 실행해주어야 끊임없이 스트림 처리를 할 수 있습니다. Driver 프로세스가 비정상적으로 종료(exit code != 0)할 경우, 자동으로 driver를 재시작하는 방법은 클러스터 환경에 따라 다릅니다.

  • Spark Standalone: spark-submit을 cluster 모드로 설정하고 “– supervise” 옵션을 지정합니다.
  • Mesos: Marathon을 사용합니다.
  • YARN
    • yarn-cluster 모드: YARN 설정에서 “yarn.resourcemanager.am.max-attempts” 를 지정합니다.
    • yarn-client 모드: 별도로 지원되지 않아 Spark Streaming을 yarn-client 모드로 실행하는 shell script를 따로 만들어, 프로세스가 종료될 경우 재시작 하도록 개발합니다.

참고로 구축한 스트림 처리 인프라에서 테스트해 본 결과, Spark Streaming이 재시작하면 YARN에서 자원을 할당 받고 첫 배치 실행까지 약 40여초의 시간이 걸립니다. 그리고 첫 배치 이후 몇 번의 배치는 데이터 처리 속도가 평소보다 느립니다. 그 이유는 YARN으로부터 할당 받은 모든 노드에 Spark executor가 처음 실행될 때까지 시간이 걸리기 때문입니다. 따라서 executor를 많이 지정하면 초기 처리 속도 지연이 좀 더 커질 수 있습니다.

앞서 checkpoint의 불안정성을 설명하면서 잠깐 언급하였듯이, 시스템 장애가 아닌 운영상의 이유로 어플리케이션을 중단해야 하는 상항이 종종 발생합니다. Spark Steaming을 중단하는 방법은 아주 간단하게 ‘kill PID’ 명령으로 driver 프로세스에 SIGTERM 시그널을 보내면 됩니다.

Spark Streaming을 중단할 때 고민해야 할 점은 중단 시점의 처리 단계를 알 수 없다는 점입니다. 다행히 멱등하게 exactly-once를 보장하도록 구현하였기 때문에, 심지어 Apache Phoenix에 저장하는 과정 중에 중단되어도 다음 실행에 중단된 배치를 다시 처리하여 최종 결과에는 문제가 없습니다.

다만, 이름만 보고 우아한 종료 설정인 spark.streaming.stopGracefullyOnShutdown을 true로 설정하고 특정 배치 처리 과정에서 중단하면, 해당 배치는 즉시 중단되지만 스케쥴링 큐에 들어간 나머지 배치 작업들은 모두 정상적으로 처리하고 Spark Streaming이 종료합니다. 따라서 최종 offset은 가장 마지막에 성공적으로 끝난 배치를 기준으로 위치를 가리킵니다. 실제로 중단 신호를 받은 배치는 비정상적으로 중단되었음에도 불구하고 offset은 증가한 채 종료하기 때문에, 재시작하면 마지막 offset 부터 시작하게 되어 사실상 중간 유실이 발생합니다.

우아한 종료는 receiver를 사용할 때 이미 받아온 데이터가 중간에 유실되지 않고 모두 처리하고 중단하기 위한 용도로 만들어졌는데, 오히려 유실을 발생시킬 수 있는 기능이 되었습니다. 따라서 중단 신호를 받으면 어떤 데이터 처리 과정에 있는지 관계 없이 우아하지 않게 즉각 중단해서, 재시작할 때 해당 배치 과정부터 재처리할 수 있도록 해야 유실을 막을 수 있습니다.

Kafka의 offset 관리만 잘하면, 장애로 인해 예상치 못하게 중단되거나 운영 상의 이유로 언제든지 중단하여도, 안심하고 장애처리나 업그레이드 같은 작업을 하고 재시작할 수 있습니다. 하지만 안심하고 중단 후 재시작하려면 꼭 염두에 두어야 할 점이 있습니다. 중단된 시간이 너무 길어지면, Kafka에서 버퍼링하며 저장할 수 있는 데이터 크기를 넘어설 수 있습니다. Kafka가 보관하고 있어서 언제든지 전송할 수 있는 offset을 지나치면, 스트림 처리에서 offset을 잘 저장하고 있더라도 재시작할 때 OffsetOutOfRangeException이 발생합니다. 당연히 유실 없는 스트림 처리는 불가능한 상황이 됩니다. 따라서 중단될 수 있는 최대 예상 시간 동안 데이터를 충분히 버퍼링 할 수 있도록 Kafka를 설정해 둬야 합니다.

개발 및 운영 Tips

실제 스트림 처리 인프라를 개발하고 운영하면서 얻은 팁들을 간단하게 소개해 드립니다.

spark-shell에서 Spark Streaming 실행 테스트

  • Spark-shell을 실행하면 SparkContext(sc)가 이미 존재하기 때문에, StreamingContext는 다음과 같이 얻을 수 있습니다.
  • val ssc = new StreamingContext(sc, Seconds(1))
  • spark-shell 내에서는 sc를 생성하면서 conf를 넘겨줄 수 없기 때문에, spark-shell을 실행할 때 –conf 옵션으로 설정해야 합니다.

rdd.toDebugString() 버그

  • Spark 개발 시에 rdd.toDebugString()으로 RDD 리니지(lineage)를 확인하는 경우가 많습니다.
  • 그런데, Spark에서 잘 동작하는 rdd.toDebugString()이 오랫동안 실행하는 Spark Streaming에서 예외를 발생시키는 경우가 있으니 사용에 주의해야 합니다.

Event-time 기반 처리

  • 현재 Spark Streaming은 처리하는 데이터의 생성 시각인 event-time은 살펴보지 않습니다.
  • 향후 user-defined time extraction function을 통해 제공할 예정이라는 계획만 잡혀 있습니다.
  • Web UI에서도 일정 지연(Scheduling delay)만 제공할 뿐입니다. 따라서 로그 발생 시각을 기준으로 처리가 얼마나 지연되고 있는지 확인하려면 직접 개발해야 합니다.

메모리 관리

  • KryoSerializer: 디스크와 네트웍 I/O를 효율적으로 사용하고 성능을 높이기 위해 KryoSerializer를 사용합니다.
    • spark.serializer을 org.apache.spark.serializer.KryoSerializer로 설정합니다.
  • RDD를 캐싱(Caching)할 때 메모리에 직렬화하여 저장하도록 합니다.
    • Spark Streaming 연산을 거친 RDD들을 캐싱하면 기본적으로 StorageLevel.MEMORY_ONLY_SER으로 설정되지만, foreachRDD() 내부에서 사용하는 RDD는 Streaming 연산이 아니기 때문에 기본적으로 StorageLevel.MEMORY_ONLY으로 설정됩니다.
    • 따라서, foreachRDD() 내부의 RDD를 캐싱하려면, 직접 rdd.persist(StorageLevel.MEMORY_ONLY_SER) 처럼 명시해 주어야 합니다.
    • 캐싱한 RDD도 사용을 마치면 rdd.unpersist(false)로 해제해 주어야 끊임없이 동작하는 Spark Streaming에서 메모리 관리가 원활해집니다.
  • CMS(concurrent mark-and-sweep) Garbage Collector를 사용해 GC로 인한 스트림 처리 지연을 방지합니다.
    • spark.driver.extraJavaOptions와 spark.executor.extraJavaOptions를 통해 다음과 같이 GC설정을 할 수 있습니다.
    • -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:+CMSParallelRemarkEnabled -XX:CMSInitiatingOccupancyFraction=70 -XX:+UseCMSInitiatingOccupancyOnly -XX:ConcGCThreads=1

결과 저장

  • Apache Phoenix에 저장하는 최종 결과 외에 다른 운영상의 통계적인 데이터를 저장해야 할 수도 있습니다.
  • 이러한 결과를 HDFS에 저장할 수도 있으나 분산 시스템인 HDFS의 응답속도가 튀는 경우가 발생하면 스트림 처리 전체에 지연이 발생합니다.
  • 앞서 driver의 로그를 로컬 디스크에 저장했듯이, 저장할 결과를 HDFS 보다 로컬 디스크에 저장하고 async하게 HDFS로 전송하는 방법이 스트림 처리에 영향을 덜 끼칩니다.

별도 모니터링 프로세스

  • 항상 동작하는 stream processing이 정말 잘 동작하는지 주기적으로 확인하는 별도의 모니터링 프로세스가 필요합니다.
  • Spark Web UI는 회사 밖이나 모바일로 보기 어렵기 때문에 Spark Streaming에 이상 징후가 보이면 이메일이나 slack으로 알림을 보낼 수 있으면 좋습니다.
  • 더 나아가 제대로 동작하지 않는 상황으로 판단되면, 알림 뿐만 아니라 즉각적인 조치를 취할 수 있도록 개발해 두면 좋습니다.

Spark Executor 수 설정

  • Spark Streaming 어플리케이션에 할당할 executor 수는 Kafka의 topic 파티션 수와 Apache Phoenix에 동시에 저장할 worker 수의 합보다 커야 여유가 있습니다.
  • Apache Phoenix에 저장하는 worker의 수는 RDD의 파티션 수와 같고, RDD의 파티션 수는 rdd.repartition()으로 변경 가능합니다.
  • worker 수를 늘리면 초당 Apache Phoenix에 저장할 수 있는 데이터 양이 많아지지만, HBase의 Region 수가 정해져있기 때문에, worker 수를 계속 늘려도 초당 Apache Phoenix에 저장할 수 있는 데이터 양은 무한정 증가하진 않습니다.
  • 참고로 receiver를 사용할 때 executor 수가 receiver 수보다 크지 않으면 데이터를 받기만 할 뿐, 처리할 수 있는 executor를 할당 받지 못해 데이터 처리 단계가 동작하지 않습니다.

Spark Executor 메모리 설정

  • 데이터 원천으로부터 읽은 데이터를 executor의 메모리에 담아두어야 하기 때문에, executor의 메모리 설정은 하나의 배치 단위에서 받아올 수 있는 최대 데이터 크기보다 크게 설정해야 합니다.
  • Window 연산을 할 경우, window 크기만큼 여러 배치의 데이터를 모두 메모리에 올릴 수 있어야 합니다.

최대 처리율(max rate) 설정

  • Kafka Direct API를 사용할 때 최대 처리율을 지정하는 옵션은 spark.streaming.kafka.maxRatePerPartition 입니다.
  • 이 설정은 파티션당 초당 처리량이라는 점을 주의합니다. 배치 간격이 10초이고 최대 처리율이 1000이고, 파티션 수가 4개라면 데이터는 최대 4 * 1000 * 10 = 4000만큼 받아옵니다.
  • 최대 처리율을 설정하지 않으면 Spark Streaming을 중단했다가 재시작할 때, 그동안 처리하지 않고 쌓인 데이터를 한꺼번에 받아와서 처리하려고 합니다.
  • 배치 간격 동안 충분히 처리할 수 있는 데이터 양이라면 문제가 없지만, 중단 시간이 길어질수록 데이터 양은 많아지기 때문에 배치 간격 내에 처리하기 어려워지고 지연시간이 증가하거나 메모리 부족이 발생합니다.
  • 중단과 재시작에서 설명하였듯이 재시작시 초기 배치는 시간이 더 걸리기 때문에, 안심하고 재시작을 자유롭게 하려면 최대 처리율을 설정하는 것이 좋습니다.
  • 최대 처리율은 여러 번의 테스트를 통해 배치 간격 동안 처리할 수 있는 최대로 지정합니다. 단, 재시작 초기의 처리는 평소보다 느리니 초기 결과를 사용하지 않습니다.
  • 최대 처리율을 낮게 설정하면, 오랫동안 중단된 후 재시작하는 경우에 중단된 시간만큼의 데이터를 따라잡는데 오래 걸릴 수 있습니다.
  • 최대 처리율을 높게 설정했음에도 불구하고 따라잡는 시간이 오래 걸릴 경우, KafkaUtils.createRDD()을 사용해 배치 처리를 할 수 있습니다. 단, offsetsRange를 지정해야 하므로, 마지막 처리 offset과 현재 Kafka에 들어온 마지막 offset을 알아야 합니다.
  • 여러 개의 Kafka topic을 동시에 처리하면, 최대 처리율은 당연히 데이터 입수율이 가장 높은 topic을 기준으로 설정합니다.

역 압력 설정

  • 역 압력 설정은 spark.streaming.backpressure.enabled를 true로 설정합니다.
  • 최대 처리율을 설정하더라도 역 압력을 설정하면 분산 환경과 입력 데이터의 변화에 대응하기 좋습니다.
    • 다음 화면은 실제로 역 압력이 동작할 때의 Web UI 화면입니다.
    • 분산 환경의 특성상 처리 시간이 배치 간격보다 커지면, 다음 배치의 일정 지연이 생기고 불안정(unstable) 상태가 됩니다.
    • 불안정 상태가 되면 역 압력에 의해 입력율(Input Rate)을 줄임으로써 처리 시간을 줄이고, 배치 간격을 기다리지 않고 연속해서 다음 배치를 실행해 일정 지연을 0으로 만듭니다.
    • 일정 지연이 0이 되면 입력율을 조금씩 높여서 버퍼링 되는 데이터를 줄입니다.
    • 입력율은 안정 상태가 지속되는 한 최대 처리율까지 올라갈 수 있습니다.
    • 버퍼링 되는 데이터를 모두 소진하면 입력율은 다시 평소처럼 낮아집니다.
    • 즉, 역 압력 설정을 하면 처리 시간이 배치 간격보다 클 때마다 입력율이 낮아졌다 올라갔다 다시 정상으로 돌아오는 과정이 반복됩니다.

    역 압력(Back Pressure)설정에 따른 입력 데이터 변화 대응 그림 6. 역 압력(Back Pressure)설정에 따른 입력 데이터 변화 대응

    스트림 처리 현황 및 계획

    현재 Data Infrastructure팀의 스트림 처리 인프라는 45대의 YARN 노드와 45대의 HBase Region 노드에서 Spark Streaming에 20~35개의 executor를 할당해서 배치 간격은 5~10초로 설정하고 초당 최대 2만건의 로그를 처리해서 하루 최대 1억건의 로그를 Apache Phoenix에 저장하고 1초 이내의 응답속도로 결과 테이블 내 데이터를 질의할 수 있습니다.

    실제로 현재 SK플래닛의 Syrup WalletOK캐쉬백 App에서 동작하는 Proximity SDK에서 전송하는 로그를 처리해, 암호화된 MDN(Mobile Device Number: 휴대 전화 번호)과 BLE(Bluetooth Low Energy) Key를 실시간으로 mapping하는 테이블 구축과 BLE 관련 이상징후를 탐지하기 위한 로그 조회용 테이블 구축에 스트림 처리 인프라를 사용하고 있습니다.

    또한 RecoPick 서비스의 실시간 추천과 데이터 처리를 스트림 처리 인프라로 전환 개발 중이고, 조만간 11번가 데이터 처리에도 활용 예정입니다. 추후 스트림 처리가 필요한 다양한 서비스에 발전하는 스트림 처리 기술을 계속 적용해 나가고자 합니다.

    기타 궁금한 점이 있거나 의견이 있으면 부담 없이 댓글 올려주시길 바랍니다.

    추가 참고 자료

엄태욱 Data Infrastructure 팀

Data Infrastructure팀 Data Programmer

공유하기