ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Kafka] 카프카 개념 및 예시코드
    프로그래밍/기타 2021. 8. 19. 16:20
    반응형
    아파치 Kafka 따라잡기, 니샨트 가르그

     

    카프카의 기본 개념 및 자바를 기반으로 한 예시코드에 대해 알아보겠습니다~!

    위에 인용한 책과 구글링을 바탕으로 정리하였습니다.

     

    개념


     

    카프카 소개 

    메세지 퍼블리싱?
    다양한 애플리케이션의 메시지를 서로 전달할 수 있도록 연결하는 구조를 의미한다. 
    카프카는 대표적인 메시지 브로커이다. 카프카는 실시간으로 대량의 정보를 다루고 여러 정보 소비자에게 빠르게 전달하는 과정에서 생기는 문제점을 해결한 솔루션이다.
    카프카를 통해 정보 소비자는 정보 생성자에 대해 알 필요 없고 생산자는 누가 최종소비자인지 알 필요 없이 서로 연동시켜준다.

     

    카프카 특징

    - 비휘발성 메시징: 정보 유실 X
    - 높은 처리량: 초당 수백만 건의 메시지 처리
    - 분산: 카프카 서버들을 대상으로 메시지 파티셔닝 지원
    - 다양한 클라이언트 지원: 자바, 파이썬 등 다양한 플랫폼과 연동 가능
    - 실시간: 생산자 스레드에 의해 생성된 메시지들은 즉시 소비자 스레드에서 확인 가능

     

    생산자와 소비자

    생산자
    프론트엔드 웹 애플리케이션
    웹 분석 로그 생산하는 생산하는 생산자 프록시
    변환 로그를 생상하는 생산자 어댑터
    호출 추적 로그를 생산하는 생산자 서비스
    소비자
    데이터 웨어하우스 또는 하둡에 저장하는 오프라인 소비자
    HBase나 카산드라같은 근 실시간 분석을 위한 저장소에 저장하는 근 실시간 소비자
    인메모리 데이터베이스에서 메시지를 필터링하고 관련 그룹들을 위해 경고 이벤트 등을 발생시키는 실시간 소비자

     

    카프카 설치

    리눅스 & JDK 1.8 버전이 설치가 되어있어야 함
    (다운로드 받고 압축만 풀면 됨)
    https://kafka.apache.org/documentation/#quickstart
    주키퍼 실행 (카프카는 주키퍼 위에서 돌아가므로 먼저 구동시켜야함)
    터미널) bin/zookeeper-server-start.sh config/zookeeper.properties
    카프카 브로커 실행
    터미널) bin/kafka-server-start.sh config/server.properties
    토픽 생성
    터미널) bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 5 --topic kafkatopic # kafkatopic이라는 토픽 생성 / 파티션은 5개
    생성자 (메세지 전송하기)
    터미널) bin/kafka-console-producer.sh --broker-list localhost:9092 --topic kafkatopic
    소비자 (메세지 받기)
    터미널) bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic kafkatopic --from-beginning

     

    카프카 디자인

    카프카 구조

    기본구조 

    생산자카프카 브로커에 생성된 카프카 토픽으로 메시지를 보낸다.

    카프카 브로커는 카프카 서버로 동작한다.

    소비자는 메시지를 얻기 위해 카프카 토픽을 구독한다.

     

     

     

    실습


     

    기본 환경설정

    <!-- pom.xml에 다음과 같은 dependency 추가 --!>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>0.8.2.0</version>
    </dependency>

    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-streams</artifactId>
      <version>1.1.0</version>
    </dependency>

    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.11</artifactId>
      <version>0.8.2.1</version>
    </dependency>

     

    생산자

    package com.test.kafka;
    
    import java.util.Properties;
    
    import kafka.javaapi.producer.Producer;
    import kafka.producer.KeyedMessage;
    import kafka.producer.ProducerConfig;
    
    
    // 생산자 생성
    public class SimpleProducer {
    	private static Producer<Integer, String> producer; 
    	
    	private final Properties props = new Properties();
    	public SimpleProducer() {
    		// 카프카 브로커에 접속하기 위한 속성값 
    		props.put("metadata.broker.list", "localhost:9092"); // 생산자가 접속할 필요가 있는 브로커
    		props.put("serializer.class", "kafka.serializer.StringEncoder"); // 생산자로부터 소비자로 메시지를 전송하는 것을 준비하는 데 사용하는 직렬화 클래스
    		props.put("request.required.acks", "1"); // 카프카 브로커가 생산자로부터 메시지를 받았을 때 확인 응답을 보내도록 지시 
    		producer = new Producer<Integer, String>(new ProducerConfig(props)); // 해당 config 정보를 프로듀서에게 전달
    	}
    	public static void main(String[] args) {
    		SimpleProducer sp = new SimpleProducer();
    
    		String topic = "kafkatopic"; // 토픽
    		String messageStr = "kafa topic test입니다!!!"; // 토픽에 보낼 메세지
    
    		KeyedMessage<Integer, String> data = new KeyedMessage<Integer, String>(topic, messageStr);
    		producer.send(data);
    		producer.close();
    	}
    }

     

    소비자

    package com.test.kafka;
    
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.Properties;
    
    import kafka.consumer.Consumer;
    import kafka.consumer.ConsumerConfig;
    import kafka.consumer.ConsumerIterator;
    import kafka.consumer.KafkaStream;
    import kafka.javaapi.consumer.ConsumerConnector;
    
    public class SimpleHLConsumer {
    	private final ConsumerConnector consumer;
    	private final String topic;
    	
    	// 주키퍼 접속을 만드는 속성을 정의
    	public SimpleHLConsumer(String zookeeper, String groupId, String topic) {
    		Properties props = new Properties();
    		props.put("zookeeper.connect", zookeeper); // 주키퍼 접속 상세 기술
    		props.put("group.id", groupId); // 소비자 그룹 이름 기술
    		props.put("zookeeper.session.timeout.ms", "500"); // 주키퍼 타임아웃 시간 기술 
    		props.put("zookeeper.sync.time.ms", "250"); // 주키퍼 리더와 동기화 시간 기술
    		props.put("auto.commit.interval.ms", "1000"); // 소비자 오프셋이 주키퍼에 커밋되는 주기 기술
    		
    		consumer = Consumer.createJavaConsumerConnector(
    				new ConsumerConfig(props));
    		this.topic = topic;
    	}
    	
    	// 메시지를 읽는 부분 
    	public void testConsumer() {
    		Map <String, Integer> topicCount = new HashMap<String, Integer>();
    		topicCount.put(topic, new Integer(1));
    		Map<String, List<KafkaStream <byte[], byte[]>>>
    		consumerStreams = consumer.createMessageStreams(topicCount);
    		List < KafkaStream <byte[], byte[]>> streams = consumerStreams.get(topic);
    		
    		for (final KafkaStream stream:streams) {
    			ConsumerIterator< byte[], byte[] > consumerIte = stream.iterator();
    			while (consumerIte.hasNext())
    				System.out.println("Message from SIngle TOpic:" + new String(consumerIte.next().message()));
    		}
    		if (consumer!= null) {
    			consumer.shutdown();
    		}
    	}
    	
    	public static void main(String[] args) {
    		String topic="kafkatopic"; // 받을 토픽 설정 
    		SimpleHLConsumer simpleHLConsumer = new SimpleHLConsumer("localhost:2181", "testgroup", topic);
    		simpleHLConsumer.testConsumer();
    		
    	}
    }

     

    결과

    반응형

    댓글

Designed by Tistory.