-
[Kafka] 카프카 개념 및 예시코드프로그래밍/기타 2021. 8. 19. 16:20반응형
아파치 Kafka 따라잡기, 니샨트 가르그
카프카의 기본 개념 및 자바를 기반으로 한 예시코드에 대해 알아보겠습니다~!
위에 인용한 책과 구글링을 바탕으로 정리하였습니다.
개념
카프카 소개
메세지 퍼블리싱?
다양한 애플리케이션의 메시지를 서로 전달할 수 있도록 연결하는 구조를 의미한다.
카프카는 대표적인 메시지 브로커이다. 카프카는 실시간으로 대량의 정보를 다루고 여러 정보 소비자에게 빠르게 전달하는 과정에서 생기는 문제점을 해결한 솔루션이다.
카프카를 통해 정보 소비자는 정보 생성자에 대해 알 필요 없고 생산자는 누가 최종소비자인지 알 필요 없이 서로 연동시켜준다.카프카 특징
- 비휘발성 메시징: 정보 유실 X
- 높은 처리량: 초당 수백만 건의 메시지 처리
- 분산: 카프카 서버들을 대상으로 메시지 파티셔닝 지원
- 다양한 클라이언트 지원: 자바, 파이썬 등 다양한 플랫폼과 연동 가능
- 실시간: 생산자 스레드에 의해 생성된 메시지들은 즉시 소비자 스레드에서 확인 가능생산자와 소비자
생산자
프론트엔드 웹 애플리케이션
웹 분석 로그 생산하는 생산하는 생산자 프록시
변환 로그를 생상하는 생산자 어댑터
호출 추적 로그를 생산하는 생산자 서비스
소비자
데이터 웨어하우스 또는 하둡에 저장하는 오프라인 소비자
HBase나 카산드라같은 근 실시간 분석을 위한 저장소에 저장하는 근 실시간 소비자
인메모리 데이터베이스에서 메시지를 필터링하고 관련 그룹들을 위해 경고 이벤트 등을 발생시키는 실시간 소비자카프카 설치
리눅스 & JDK 1.8 버전이 설치가 되어있어야 함
(다운로드 받고 압축만 풀면 됨)
https://kafka.apache.org/documentation/#quickstart주키퍼 실행 (카프카는 주키퍼 위에서 돌아가므로 먼저 구동시켜야함)
터미널) bin/zookeeper-server-start.sh config/zookeeper.properties
카프카 브로커 실행
터미널) bin/kafka-server-start.sh config/server.properties
토픽 생성
터미널) bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 5 --topic kafkatopic # kafkatopic이라는 토픽 생성 / 파티션은 5개
생성자 (메세지 전송하기)
터미널) bin/kafka-console-producer.sh --broker-list localhost:9092 --topic kafkatopic
소비자 (메세지 받기)
터미널) bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic kafkatopic --from-beginning카프카 디자인
기본구조
생산자는 카프카 브로커에 생성된 카프카 토픽으로 메시지를 보낸다.
카프카 브로커는 카프카 서버로 동작한다.
소비자는 메시지를 얻기 위해 카프카 토픽을 구독한다.
실습
기본 환경설정
<!-- pom.xml에 다음과 같은 dependency 추가 --!>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.8.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.8.2.1</version>
</dependency>생산자
package com.test.kafka; import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; // 생산자 생성 public class SimpleProducer { private static Producer<Integer, String> producer; private final Properties props = new Properties(); public SimpleProducer() { // 카프카 브로커에 접속하기 위한 속성값 props.put("metadata.broker.list", "localhost:9092"); // 생산자가 접속할 필요가 있는 브로커 props.put("serializer.class", "kafka.serializer.StringEncoder"); // 생산자로부터 소비자로 메시지를 전송하는 것을 준비하는 데 사용하는 직렬화 클래스 props.put("request.required.acks", "1"); // 카프카 브로커가 생산자로부터 메시지를 받았을 때 확인 응답을 보내도록 지시 producer = new Producer<Integer, String>(new ProducerConfig(props)); // 해당 config 정보를 프로듀서에게 전달 } public static void main(String[] args) { SimpleProducer sp = new SimpleProducer(); String topic = "kafkatopic"; // 토픽 String messageStr = "kafa topic test입니다!!!"; // 토픽에 보낼 메세지 KeyedMessage<Integer, String> data = new KeyedMessage<Integer, String>(topic, messageStr); producer.send(data); producer.close(); } }
소비자
package com.test.kafka; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; public class SimpleHLConsumer { private final ConsumerConnector consumer; private final String topic; // 주키퍼 접속을 만드는 속성을 정의 public SimpleHLConsumer(String zookeeper, String groupId, String topic) { Properties props = new Properties(); props.put("zookeeper.connect", zookeeper); // 주키퍼 접속 상세 기술 props.put("group.id", groupId); // 소비자 그룹 이름 기술 props.put("zookeeper.session.timeout.ms", "500"); // 주키퍼 타임아웃 시간 기술 props.put("zookeeper.sync.time.ms", "250"); // 주키퍼 리더와 동기화 시간 기술 props.put("auto.commit.interval.ms", "1000"); // 소비자 오프셋이 주키퍼에 커밋되는 주기 기술 consumer = Consumer.createJavaConsumerConnector( new ConsumerConfig(props)); this.topic = topic; } // 메시지를 읽는 부분 public void testConsumer() { Map <String, Integer> topicCount = new HashMap<String, Integer>(); topicCount.put(topic, new Integer(1)); Map<String, List<KafkaStream <byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount); List < KafkaStream <byte[], byte[]>> streams = consumerStreams.get(topic); for (final KafkaStream stream:streams) { ConsumerIterator< byte[], byte[] > consumerIte = stream.iterator(); while (consumerIte.hasNext()) System.out.println("Message from SIngle TOpic:" + new String(consumerIte.next().message())); } if (consumer!= null) { consumer.shutdown(); } } public static void main(String[] args) { String topic="kafkatopic"; // 받을 토픽 설정 SimpleHLConsumer simpleHLConsumer = new SimpleHLConsumer("localhost:2181", "testgroup", topic); simpleHLConsumer.testConsumer(); } }
결과
반응형'프로그래밍 > 기타' 카테고리의 다른 글