RecoPick 실시간 데이터 처리 시스템 전환기 (Storm에서 Spark Streaming으로 전환)

안녕하세요. Data Infrastructure팀(이하 DI팀) 박소라, 엄태욱 입니다.

이번에 소개해 드릴 내용은 SK플래닛의 추천 플랫폼인 레코픽(RecoPick)에서 실시간 로그 처리를 위해 AWS(Amazon Web Services) 상에서 Storm을 기반으로 구현됐던 작업들을, 사내 클러스터인 DI클러스터(이하 DIC)에서 Spark Streaming 작업으로 전환한 경험입니다.

레코픽(RecoPick)이란?

사이트 내 상품추천(웹/모바일)과 개인화 마케팅을 위한 추천(메일/문자/푸쉬알림 등)을 서비스 형태(SaaS)로 제공하는 추천 플랫폼입니다. 쇼핑몰 고객의 로그 데이터를 분석해 고객들의 보이지 않는 구매 패턴을 찾아내고, 이를 기반으로 고객에게 구매 가능성이 높은 상품을 제시해 추천으로 제공하며 어느 사이트에나 적용 가능한 서비스입니다.
레코픽 추천은 크게 상품 기준 추천, 개인화 추천, 통계형 추천이 있고, 각 추천마다 여러 가지 알고리즘이 제공됩니다. 각 사이트 내에서 필요한 알고리즘을 선택해 추천상품에 적용할 수 있고, 필요한 경우 A/B 테스트를 통해 더 적합한 알고리즘을 선택할 수 있도록 자동화된 A/B 테스트 기능도 제공합니다.
현재 레코픽은 11번가, 신세계면세점, 삼성전자, AK mall, 아모레퍼시픽, 동화면세점, 기타 쇼핑몰에 추천 서비스를 제공하고 있습니다. 레코픽의 상세 설명은 http://recopick.com 에서 확인하실 수 있습니다.

배경

이 작업은 기존에 AWS에서 운영되던 RecoPick의 운영 비용을 줄이고, 사내의 다양하고 큰 규모의 데이터와 함께 빠르게 가공할 수 있도록 하려는 취지였습니다. RecoPick 서비스 전체를 사내 Big Data Cluster인 DIC로 이전하겠다는 결정 하에서, 먼저 실시간 데이터 처리 영역을 전환하기로 하고, 작년 11월부터 RecoPick의 실시간 데이터 처리 로직부터 DIC의 기존 데이터 입수, 적재, 실시간 처리 시스템으로 옮기는 작업을 했습니다.
제일 먼저 한 작업은 RecoPick에서 입수되는 데이터를 DIC에 실시간으로 전달하는 작업이었습니다. 다행히 RecoPick은 DIC와 별도로 구축 됐음에도 불구하고 두 인프라 모두 Kafka를 데이터 전송 채널로 사용하고 있어서 MirrorMaker를 이용해 두 Kafka를 Mirroring 했습니다.
이후 실시간으로 Mirroring 된 RecoPick의 스트림 데이터 처리 작업을 기존의 Storm에서 Spark Streaming으로 전환하는 과정에서 얻은 경험을 공유하고자 합니다.

사전 지식

먼저, 과거 README 블로그를 통해 공개된 DI팀의 글들을 먼저 읽으면, 아래 내용을 읽는 데 조금 더 도움이 됩니다.

기존 레코픽 시스템 아키텍처

기존 레코픽은 AWS 상의 EC2 인스턴스에서 Kafka, Storm, Hadoop 클러스터를 구축해 데이터를 처리했습니다. 로그 수집 스크립트와 API를 통해 Kafka에 로그를 적재하고, 이렇게 적재된 로그를 Storm에서 실시간으로 ETL(Extract, Transform, Load) 처리를 합니다. 이 처리 과정의 일환으로 로그를 파싱해 Redis, HBase, S3, MySQL에 각각 저장하고 실시간 추천 계산, 각종 통계 계산, 상품 정보 제공 등에 이용합니다. 레코픽 데이터 흐름과 전체 시스템 아키텍처는 아래와 같습니다.

 

RecoPick As-is Overview그림1. RecoPick As-is Overview

위 그림과 같이, 기존 RecoPick은 로그 ETL 처리를 할 때, 한 번에 한 tuple씩 처리하기 때문에 latency가 매우 짧은 Storm을 사용합니다.

  1. Kafka-spout을 사용해 Kafka에서 곧바로 stream source를 읽어오고,
  2. Bolt에서 로그를 파싱하는 과정을 거친 후,
  3. HBase, MySQL, Redis, S3 등의 저장소에 저장했습니다.

여러 개의 Storm 작업으로 분류된 이유는 데이터를 스트림 처리해 끊임없이 HBase에 적재하면서, 서버 쪽에서 HBase로부터 데이터 읽기를 동시에 처리하다 보니 간혹 데이터 입수 지연이 발생했기 때문입니다. 이런 입수 지연이 실시간성을 저해하지 못하도록 로그를 파싱하는 작업, HBase에 적재하는 작업, Redis에 적재하는 작업을 따로 분리해 운영했습니다. 분리의 기준을 이렇게 잡은 이유는 실시간성이 중요한 작업과 그렇지 않은 작업으로 나누는 분리 법칙을 적용했기 때문입니다.

  1. HBase에 적재되는 데이터는 추천을 위한 배치 연산에 사용되는 데이터로, 상대적으로 실시간성이 중요한 요소가 아닌 반면,
  2. Redis에 저장되는 데이터는 실시간 추천 계산에 사용되는 용도라 실시간성이 중요하고,
  3. MySQL 통계 DB에는 레코픽 관리자 대시보드에서 PV와 UV 등의 통계성 정보를 실시간으로 가공해 넣어야 해서, 배치 연산과 실시간 연산의 요소가 결합돼 있습니다.

DIC에서 구축한 레코픽 데이터 처리 시스템

기존 레코픽에서는 Storm 0.9.5 버전을 사용하고 있었는데, Supervisor들의 비정상적인 종료가 잦았고, Storm의 자체 기능인 Rebalance를 수행하면 Spout과 Bolt의 Executor들이 여러 장비에 골고루 재분포가 되지 않고 한 쪽 장비에 쏠리는 문제가 있었습니다. 그리고 여러 개의 Storm 작업을 한 번에 실행시키면 Executor가 제대로 분배되지 않아 실행이 종료되는 문제도 있었습니다. 또한 레코픽에서 사용한 Core Storm은 Exactly-once 전송을 보장하지 않기 때문에 유실이 발생하거나 중복이 생길 수 있다는 단점이 있었습니다.
이런 단점들을 보완하고자 DIC에 구축돼 있는 Spark Streaming으로 스트림 처리를 하도록 변경했습니다. Data Infrastructure팀에서 구축한 스트림 처리 인프라의 전체 아키텍처는 아래의 그림과 같습니다.

SK플래닛 스트림 처리 인프라 아키텍처

그림2. SK플래닛 스트림 처리 인프라 아키텍처

현재 Data Infrastructure팀의 스트림 처리 인프라는 45대의 YARN노드와 45대의 HBase Region 노드에서 Spark Streaming에 20~35개의 Executor를 할당했고, 배치 간격은 5~10초로 설정하고 초당 최대 2만건의 로그를 처리해서 하루 최대 1억건의 로그를 Phoenix에 저장하고 평균 150ms 이내의 응답속도로 결과 테이블 내 데이터를 질의할 수 있습니다. Data Infrastructure팀의 전체 인프라 아키텍처는 Spark Streaming으로 유실 없는 스트림 처리 인프라 구축하기에서 더 자세히 살펴보실 수 있습니다.
Spark Streaming을 이용한 레코픽 데이터 처리 아키텍처는 아래의 그림과 같습니다.

recopick-first-stage그림3. RecoPick To-be Overview

위 그림에서 볼 수 있듯이, 번잡했던 데이터 저장소를 Phoenix로 단일화함으로써 외부에 제공하는 Data Product의 저장소가 통일됐습니다. 기존의 레코픽에서 저장소로 사용하던 Redis를 Phoenix로 변경하는 것이 가장 고민되는 요소였습니다. Phoenix가 Redis처럼 빠른 성능을 낼 수 있도록 구현해야 했기 때문입니다. 현재 테스트 결과 추천 계산 서버 쪽에서 20ms 이하의 latency로 데이터를 가져가는 것이 가능하도록 구현했습니다.
뿐만 아니라 기존에는 로그 처리 지연 방지를 위해 여러 개의 Storm 작업으로 분류해 처리했지만, Spark Streaming으로 변경하면서 하나의 작업에서도 여러 개의 Phoenix 테이블에 저장하는데 지연 없이 처리되도록 로직을 변경해 구현했습니다.
Spark Streaming은 마이크로 배치 방식으로 처리하는 프레임워크이기에 Storm에 비해 latency가 떨어집니다. 그러나 DIC에 구축한 레코픽 ETL 시스템은 배치 주기를 5초 이내로 지정하고 5초 이내에 모든 처리가 끝나도록 설계해 data freshness 측면에서 큰 차이가 없습니다. 현재 초당 수 천 건의 로그를 처리하고 있고 로그 입수부터 Phoenix에 적재하기까지의 전체 처리 시간은 1초 이내로 끝나서 배치 처리에 지연이 발생하지 않고 근 실시간으로 처리가 가능합니다.
또한, 기존에는 ETL 처리된 로그를 실시간으로 S3에 쌓고 한 시간 배치로 모아서 통계를 구현했기 때문에 한 시간 단위의 통계가 대부분이었지만, 변경된 구조에서는 Data Infrastructure팀에서 구축한 아키텍처를 바탕으로 실시간으로 통계를 계산할 수 있게 됐습니다. 그 이유는 Data Infrastructure팀의 Sentinel, Collector, Galleon 등의 서비스들 덕분입니다. 로그를 마이크로 배치 처리하고 Sentinel 스키마에 맞게 Kafka에 쌓으면 DIC 내의 Collector가 실시간으로 HDFS에 계속 저장합니다. Collector는 Hive 구조에 맞춰 파티셔닝해 데이터를 쌓고 이 데이터들은 Galleon을 통해 실시간으로 접근해 통계를 집계할 수 있습니다. 기존에는 실시간 PV, UV를 집계하기 위한 Storm 작업이 따로 존재했지만, 변경된 구조에서는 별도의 Spark 계산 작업 없이 Galleon 쿼리로 실시간 통계를 집계할 수 있게 됐습니다.
변경된 시스템은 Kafka Direct API를 이용해 Exactly-once 전송을 보장하도록 구현했고 Kafka의 Offset 관리도 직접 저장하고 관리하는 방식으로 현재 로그 유실이나 지연 없이 스트림 처리가 가능해졌습니다.

전환 작업 내용

기존 AWS 상에 존재하던 5개의 Storm 작업은 DIC 상에서는 다음과 같이 크게 2개의 Spark Streaming 작업으로 전환됐습니다.

  • RecoPick: Kafka로부터 raw_log를 받아 파싱한 후
    • Phoenix의 User테이블과 Uid2Uuid 테이블에 사용자 정보 저장
    • Phoenix의 User2Item 테이블에 사용자의 상품에 대한 행위 정보(보기, 장바구니 담기, 주문하기 등) 저장
    • 파싱된 로그를 HDFS에 저장할 수 있도록 다시 Kafka로 보냄
  • RecoClick: Kafka로부터 click_log를 받아 파싱후 HDFS에 저장할 수 있도록 다시 Kafka로 보냄

특히, RecoPick 작업의 결과인 User2Item 정보는 RecoPick 서비스에서 실시간 개인화 추천에 사용되기 때문에, 고객이 상품을 본 직후 다음 화면에서 방금 본 상품과 연관된 상품을 추천할 수 있도록 빠르게 처리돼야 하는 요구 사항이 있습니다.
여기서, Spark Streaming에서 로그를 파싱한 후 HDFS에 직접 저장하지 않고, Kafka로 다시 보낸 이유는, 이미 DIC 내에는 Kafka에서 HDFS에 실시간으로 로그를 저장하는 Collector가 별도로 존재하기 때문입니다. 관심 분리(Separation of concern) 개념에 따라 Spark Streaming은 실시간 로그 파싱에 집중할 수 있고, Collector는 실시간 저장에 집중하고, Kafka는 데이터 전송에 집중할 수 있게 되면서 전체 구조도 깔끔하고, 추후 유지 보수나 컴포넌트 교체에도 유연하게 대처할 수 있게 됐습니다. 따라서, 향후 Spark Streaming이 담당하는 실시간 로그 파싱 및 처리를 Kafka Streams 또는 Apache Flink로 대체하거나, Collector가 담당하는 실시간 로그 적재를 Kafka Connect로 대체할 수 있는 발판이 됩니다.
Spark Steaming으로 구현한 전체 아키텍처는 Spark Streaming으로 유실 없는 스트림 처리 인프라 구축하기에서 크게 벗어나지 않았습니다. 전체 아키텍처 외에 지난 포스팅에서는 말씀 드리지 못했던 실제 구현 과정에서 얻은 몇 가지 Spark Streaming 개발 Tip을 알려 드리겠습니다.

운영상 Spark Streaming 중단 처리 방법

RecoPick 로그 처리 작업은 앞서 설명 드렸듯이 Kafka로부터 읽어들인 raw_log를 파싱후 3개의 Phoenix 테이블에 저장하고 Kafka로 다시 전송하는 4개의 출력 경로가 있습니다. 지난 Spark Streaming으로 유실 없는 스트림 처리 인프라 구축하기에서 설명했듯이, Phoenix는 멱등(Idempotent)하게 저장하면 exactly-once를 보장합니다. 문제는 Kafka New Producer API를 활용한 유실 없는 비동기 데이터 전송에서 설명했듯이, Kafka로 전송하는 경우 전송 도중 장애가 발생하면 at-least-once 밖에 보장할 수 없습니다.
대부분의 경우 Kafka Broker의 문제가 아닌 이상 Kafka 전송에서 오류가 나는 경우는 없습니다. Kafka 전송이 Transaction을 지원하지 않는 한, 예기치 못한 장애로 일부 데이터 중복이 발생할 가능성은 피할 수 없습니다. 하지만 적어도 운영상의 이유로 Spark Streaming을 중단하는 경우에는 중복이 발생하지 않아야 합니다.
운영상의 중단이 Kafka 전송 중에 일어나지 않도록 하기 위해, Spark Streaming의 중단을 micro-batch가 끝나서 offset 정보를 갱신한 이후로만 한정 짓도록 합니다. 즉, Kafka로부터 처리할 raw_log를 모두 파싱하고 Phoenix 저장과 Kafka 전송이 끝나서 이번 micro-batch가 정상적으로 완료됐다고 offset을 저장한 후에만 중단이 가능하도록 합니다. 방법은 간단하게도 micro-batch가 끝나는 시점마다 중단 여부를 확인하도록 하고, 중단해야할 경우 sc.stop()을 호출하도록 합니다. 하나의 micro-batch를 끝내고 중단을 해야 하는지, 아니면 다음 micro-batch를 진행해야 하는지 확인하는 방법으로는, Spark Streaming의 driver 프로그램에서 서버 내 특정 파일(/path/stopfile)의 유무로 확인했습니다. Driver의 이중화가 필요한 경우, 로컬 파일이 아닌 Zookeeper를 활용하는 방법도 있습니다. 어느 방법을 쓰든, 최대한 빠르게 중단 여부를 확인해서 micro-batch가 지연되지 않도록 해야 합니다.

Phoenix 저장과 Kafka 전송 처리 순서

운영상 Spark Streaming의 중단 처리를 micro-batch 끝에서만 확인하도록 처리했으나, 장애로 인한 중단이나 Spark Streaming의 자체적인 재시도로 인해 Kafka 전송이 반복되는 일을 최소화해야 합니다. Kafka 재전송을 최소화 하는 방법은 Kafka 전송 처리를 micro-batch의 가장 마지막 단계에서 처리하도록 해서, Kafka 전송의 장애가 아닌 이상, 딱 한 번만 수행될 수 있도록 합니다. 특히 파싱 부분에서 발생하는 오류나 멱등성이 있어 언제든지 재처리가 가능한 Phoenix 저장 오류로 인해 Spark Streaming이 중단되거나 재시도 할 경우에도 Kafka 전송이 여러 번 발생하지 않도록 합니다.

Kafka에 효과적으로 전송하기

Spark Streaming에서 사용한 Kafka Producer는 0.8 버전으로, Kafka New Producer API를 활용한 유실 없는 비동기 데이터 전송에서 자세히 설명했듯이 메시지를 하나씩 send() 할 경우 sync 모드로 동작해 매우 느리게 처리됩니다. Spark Streaming은 짧은 micro-batch 내에서 처리가 끝나야 하기 때문에 KeyedMessage List를 이용해 여러 메시지를 한 번에 전송해서 처리 속도를 높여야 합니다. 물론 여러 메시지를 전송하는 만큼 실패할 경우 중복이 많이 발생할 수 있다는 점은 감수해야만 합니다.
실제 Scala로 된 Kafka 전송 코드는 다음과 같습니다.

val result = log.mapPartitions(partition => {
  if (partition.nonEmpty) {
    var producer = new Producer[String, String](new ProducerConfig(config))
    val arrayList = new util.ArrayList[KeyedMessage[String, String]]()

    partition.map(row => {
      val shuttle = new RecoPickSentinelShuttle()
      ...
      arrayList.add(new KeyedMessage[String, String](KAFKA_TOPIC, shuttle.toString))

      val sendCount = if (arrayList.size() == SEND_UNIT || !partition.hasNext) {
        var count = 0L
        try {
          producer.send(arrayList)
          count = arrayList.size().toLong
        } catch {
          case e@(_: FailedToSendMessageException | _: ProducerClosedException) => {
            producer.close
            producer = new Producer[String, String](new ProducerConfig(config))
            producer.send(arrayList)
            count = arrayList.size().toLong
          }
        }
        arrayList.clear()
        count
      } else 0L
      (1L, sendCount)
    })
  } else {
    Iterator[(Long, Long)]()
  }
})

 

KeyedMessage List로 전송하기 위해 Java의 ArrayList를 사용했습니다. Kafka Producer API가 Java의 List를 받기 때문이기도 하지만, Scala의 Collection을 사용하는 경우보다 Java의 Collection이 실제로 더 빠른 성능을 내기 때문에, Scala의 함수형 연산이 필요하지 않은 단순 저장용 Collection은 Java를 사용하는 편이 더 좋습니다.
SEND_UNIT 만큼씩 모아서 전송하고, 전송 중 오류로 인해 예외가 발생한 경우에는 Producer를 다시 만들어서 재전송을 시도합니다. Producer 내부에서는 전송을 3번씩 시도하는데, 그럼에도 불구하고 실패할 경우에는 Producer를 재설정해서 한 번만 (내부적으로는 3번) 더 시도합니다. 그러고도 실패할 경우에는 재시도가 무의미하기 때문에 전송 중단이 좋습니다.
SEND_UNIT 전송 후 남은 마지막 데이터들도 전송해야 하는데, 이 때 주의할 점은 partition.map() 이후에 전송하면 안 된다는 점입니다. Spark에서 map은 lazy하게 처리되기 때문에, map 이후의 send는 map보다 먼저 처리됩니다. 따라서 마지막 전송은 반드시 map 내부에서 !partition.hasNext로 map의 마지막임을 판단해서 전송해야 합니다. 그리고, partition은 iterator로서 hasNext를 통해 한 번씩만 순회할 수 있으므로, partition.length를 사용하면 더 이상 순회가 불가능하다는 점을 잊지 말아야 합니다.
map의 결과로 (1L, sendCount)를 남기는 이유는 Kafka 전송 후에 실제 데이터와 전송 데이터를 비교하기 위함입니다.

Phoenix에 효과적으로 저장 하기

Phoenix는 key-value로 저장하기 때문에, micro-batch 내에서 같은 Key를 여러번 저장하는 방법은 데이터 정합성을 떨어뜨릴 수 있습니다. 따라서 Phoenix에 저장하기 전에 Spark에서 Phoenix Key를 기준으로 최종 Value 하나만을 계산한 후에, Key별로 한 번만 Phoenix에 저장하도록 합니다. 특히 Phoenix에 이미 저장된 값과 로그에서 얻은 값을 함께 연산해 다시 저장한다면, 같은 Key에 대해서 Phoenix에서 읽은 시점과 저장한 시점이 RDD Partition별로 다르기 때문에, 최종 결과 값이 정확하다고 보장할 수 없습니다.
JDBC Driver를 이용해서 Spark에서 Phoenix에 저장하는 Scala 코드는 다음과 같습니다.

val result = status.mapPartitions { partition =>
  if (partition.nonEmpty) {
    val connection = DriverManager.getConnection(PHOENIX_JDBC_URL)
    val upsertSql = s"upsert into $tableName values ( ?, ?, ?, ?, ? )"

    var upsertPsmt = connection.prepareStatement(upsertSql)
    var count = 0

    val result = partition.map { row =>
      upsertPsmt.setString(1, row.uid)
      ...
      upsertPsmt.setString(5, DateTime.now.toString("yyyyMMddHHmmss"))
      val updateCount = upsertPsmt.executeUpdate()
      count += 1
      if (count % BATCH_UNIT == 0) {
        connection.commit()
        upsertPsmt.close()
        upsertPsmt = connection.prepareStatement(upsertSql)
      }
      if (!partition.hasNext) {
        connection.commit()
        upsertPsmt.close()
        connection.close()
      }
      (row.uid, row.groupSid, row.uuid, updateCount)
    }
    result
  } else {
    Iterator[(String, String, String, Int)]()
  }
}

앞서 Kafka 전송하기와 마찬가지로 Phoenix 저장도 속도를 향상시키기 위해 BATCH_UNIT 마다 commit() 하는 방식을 사용합니다.

이 코드에서 눈 여겨 보아야 할 점은 마지막 commit() 시점과 각종 close() 시점이 모두 partition.map()이 끝난 후가 아니라 !partition.hasNext 인 경우라는 점입니다. 앞서 Kafka 전송하기와 마찬가지로 Spark에서 map은 lazy하게 처리되기 때문에, map 이후에 close()으로 처리하면 map보다 먼저 처리돼 connection이 종료됐다는 오류를 얻고, 데이터는 저장되지 않는 문제가 발생할 수 있습니다.

빈 RDD에서 reduce 하기

Kafka 효과적으로 전송하기에서 (1L, sendCount)로 남긴 결과를 result.reduce((a, b) => (a._1 + b._1, a._2 + b._2)) 로 최종 결과를 얻으려고 할 때, UnsupportedOperationException if the RDD is empty 오류를 맞이할 수 있습니다. 이유는 RDD가 비어있는 경우 reduce()를 수행할 수 없기 때문입니다.
Spark Streaming은 스트림 처리를 하기 때문에, 특정 micro-batch 구간에 처리할 로그가 없는 경우가 있습니다. 또는, 로그는 있으나 불필요한 로그를 빼면 실제 저장할 로그가 없는 경우도 있습니다. 다시 말해 RDD가 비어있는 경우가 있습니다.
이 경우, reduce 대신 fold() 함수를 사용하면 빈 RDD에 대해서도 reduce()와 동일한 연산을 수행할 수 있습니다. result.reduce((a, b) => (a._1 + b._1, a._2 + b._2)) 는 result.fold((0, 0))((a, b) => (a._1 + b._1, a._2 + b._2)) 로 변경해서 빈 RDD에 대한 예외처리 없이 연산할 수 있습니다.

최종 결과

다음과 같이 3초 주기의 Spark Streaming으로 RecoPick 서비스의 AWS Storm을 DIC Spark Streaming으로 전환 개발해 유실 없이 중복은 최소화하며 하루 최대 3천만건의 로그를 안정적으로 처리 중입니다.

RecoPick 스트림 처리 결과

그림4. RecoPick 스트림 처리 결과

향후 과제

  • Kafka Producer v0.9 의 async 전송으로 수정하기
  • Spark v2.0으로 업그레이드 하기

참고로, 이 글의 공동 저자인 박소라님은 지난 여름에 글의 초안을 작성한 후 소셜 커머스 회사 “쿠팡”으로 이직하셨음을 밝히며, 멀리서나마 그간의 노고를 치하 드린다는 말씀 드립니다.

끝으로, DI팀 블로그 포스팅의 시리즈 에디터로서 꼼꼼한 교정과 함께 아키텍처 다이어그램을 작성해 주신 DI팀 송재하 팀장님께 감사의 말씀을 드립니다.

엄태욱 Data Infrastructure 팀

Data Infrastructure팀 Data Programmer

공유하기