티스토리 뷰

프로듀서는 카프카에서 데이터의 시작점이다. 프로듀서 애플리케이션은 카프카에 필요한 데이터를 선언하고 브로커의 특정 토픽의 파티션에 전송한다. 프로듀서가 데이터를 전송할 때 리더 파티션을 가지고 있는 카프카 브로커와 직접 통신하는데, 카프카 브로커로 데이터를 전송할 때 내부적으로 파티셔너, 배치 생성 단계를 거친다.


프로듀서 내부 구조

  • ProducerRecord: 프로듀서에서 생성하는 레코드, 오프셋은 미포함
  • send(): 레코드를 전송 요청하는 메서드
  • Partitioner: 어느 파티션으로 전송할지 지정하는 파티셔너
  • Accumulator: 배치로 묶어 전송할 데이터를 모으는 버퍼

Default Partitioner

프로듀서 API를 사용하면 UniformStickyPartitionerRoundRobinPartitioner 2개 파티셔너를 제공한다. 카프카 클라이언트 라이브러리 2.5.0 버전에서 파티셔너를 지정하지 않으면, UniformStickyPartitioner가 기본 파티셔너로 설정된다.


UniformStickyPartitioner

Accumulator에서 레코드들이 배치로 묶일 때까지 기다렸다가 전송한다. 파티션을 순회하면서 보내기 때문에 모든 파티션에 분배되어 전송되며, RoundRobinPartitioner에 비해 향상된 성능을 가진다.


RoundRobinPartitioner

ProducerRecord가 들어오는 대로 파티션을 순회하면서 전송한다. Accumulator에서 묶이는 정도가 적기 때문에 전송 성능이 낮다.


메시지 키가 있을 경우

UniformStickyPartitionerRoundRobinPartitioner 모두 메시지 키가 있을 때는 메시지 키의 해시값과 파티션을 매칭하여 레코드를 전송한다. 동일한 메시지 키가 존재하는 레코드는 동일한 파티션 번호에 전달되며, 만약 파티션 개수가 변경될 경우 메시지 키와 피티션 번호 매칭은 깨지게 된다.


메시지 키가 없을 경우

메시지 키가 없을 때는 최대한 동일하게 분배하는 로직이 들어있다.


Producer Custom Partitioner

카프카 클라이언트 라이브러리에서는 사용자 지정 파티셔너를 생성하기 위한 Partitioner 인터페이스를 제공한다. Partitioner 인터페이스를 상속받은 사용자 정의 클래스에서 메시지 키 또는 메시지 값에 따라 파티션 지정 로직을 적용할 수도 있다. Partitioner를 통해 파티션이 지정된 데이터는 Accumulator에 버퍼로 쌓이고, sender 스레드는 Accumulator에 쌓인 배치 데이터를 가져가 카프카 브로커로 전송한다.


Producer 필수 옵션

  • bootstrap.servers: 프로듀서가 데이터를 전송할 대상 카프카 클러스터에 속한 브로커의 hostname:port를 1개 이상 작성한다. 2개 이상 브로커 정보를 입력하여 일부 브로커에 이슈가 발생하더라도 접속하는 데에 이슈가 없도록 설정 가능하다.
  • key.serializer: 레코드의 메시지 키를 직렬화하는 클래스를 지정한다.
  • value.serializer: 레코드의 메시지 값을 직렬화하는 클래스를 저장한다.

Producer 선택 옵션

  • acks: 프로듀서가 전송한 데이터가 브로커들에 정상적으로 저장되었는지 전송 성공 여부를 확인하는 데에 사용하는 옵션이다. 0, 1, -1(all) 중 하나로 설정할 수 있고, 기본 값은 1이다.
  • linger.ms: 배치를 전송하기 전까지 기다리는 최소 시간이며, 기본 값은 0이다.
  • retries: 브로커로부터 에러를 받고 난 뒤 재전송을 시도하는 횟수를 지정한다.
  • max.in.flight.requests.per.connection: 한 번에 요청하는 최대 커넥션 개수, 설정된 값만큼 동시에 전달 요청을 수행하며, 기본 값은 5이다.
  • partitioner.class: 레코드를 파티션에 전송할 때 적용하는 파티셔너 클래스를 지정하며, 기본 값은 org.apache.kafka.clients.producer.internals.DefaultPartitioner이다.
  • enable.idempotence: 멱등성 프로듀서롤 동작할 지에 대한 여부를 설정하며, 기본 값은 false이다.
  • tracsactional.id: 프로듀서가 레코드를 전송할 때 레코드를 트랜잭션 단위로 묶을지에 대한 여부를 설정하며, 기본 값은 null이다.

ISR(In-Sync-Recplicas)

ISR은 리더 파티션과 팔로워 파티션이 모두 싱크가 된 상태를 뜻한다. 복제 개수가 2인 토픽있을 때, 이 토픽에는 리더 파티션 1개와 팔로워 파티션 1개가 존재하게 된다. 리더 파티션에 0부터 3의 오프셋이 있다고 가정해보면, 팔로워 파티션에 동기화가 완료되려면 0부터 3까지 오프셋이 존재해야 한다. 동기화가 완료됐다는 의미는 리더 파티션의 모든 데이터가 팔로워 파티션에 복제된 상태를 말하기 때문이다.


프로듀서가 특정 파티션에 데이터를 저장하는 작업은 리더 파티션을 통해 처리하는데, 이때 리더 파티션에 새로운 레코드가 추가되어 오프셋이 증가하면 팔로워 파티션이 위치한 브로커는 리더 파티션의 데이터를 복제한다. 리더 파티션에 데이터가 적재된 이후 팔로워 파티션에 복제하는 시간차로 인해 리더 파티션과 팔로워 파티션 간의 오프셋 차이가 발생할 수 있다.


acks=0

acks를 0으로 설정하면, 프로듀서가 리더 파티션으로 데이터를 전송했을 때 리더 파티션으로 데이터가 저장되었는지 확인하지 않는다. 리더 파티션은 데이터가 저장된 이후에 데이터가 몇 번째 오프셋에 저장되었는지 리턴하는데, acks가 0으로 설정되어 있으면 프로듀서는 리더 파티션에 데이터가 저장되었는지 여부에 대한 응답 값을 받지 않는다. 데이터의 전송 속도가 acks를 1 또는 all로 설정했을 경우보다 훨씬 빠르기 때문에 일부 데이터의 유실이 발생하더라도 전송 속도가 중요한 경우에 사용한다.


acks=1

acks를 1로 설정하면, 프로듀서는 보낸 데이터가 리더 파티션에만 정상적으로 적재되었는지 확인한다. 만약 리더 파티션에 정상적으로 적재되지 않았다면, 리더 파티션에 적재될 때까지 재시도가 가능하다. 하지만 복제 개수를 2 이상으로 운영할 경우 리더 파티션에 적재가 완료되, 팔로워 파티션이 리터 파티션의 데이터를 복제하기 직전에 리더 파티션이 있는 브로커에 장애가 발생하면 동기화되지 못한 데이터가 유실될 수 있다.


acks=-1(all)

acks를 all 또는 -1로 설정하면, 프로듀서는 보낸 데이터가 리더 파티션뿐만 아니라 팔로워 파티션에 모두 정상적으로 적재되었는지 확인한다. 그로 인해 0 또는 1 옵션보다는 속도가 느리지만, 일부 브로커에 장애가 발생하더라도 프로듀서는 안전하게 데이터를 전송하고 저장할 수 있음을 보장한다.

댓글
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2025/01   »
1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30 31
글 보관함