본문 바로가기

공부/기타

[Kafka] 프로듀서 / 컨슈터 사용하기

카프카 프로듀서/컨슈머

1. 명령어 사용

프로듀서 : kafka-console-producer.sh

경로 : ~~/kafka/bin

./kafka-console-producer.sh  --broker-list localhost:9092 --topic soon

옵션 :

--broker-list

  • 카프카 클러스터 내 모든 브로커 리스트를 입력
  • 브로커의 호스트명:포트번호, 브로커의 호스트명:포트번호

--topic

  • 메시지를 보내고자 하는 토픽 이름을 명시

출력 : 입력을 기다리는 창이 나타나며, 메시지 입력 후 엔터를 실행하면 전달됨

> hello
>

Ctrl + C 입력시 빠져나올 수 있음

컨슈머 : kafka-console-consumer.sh

경로 : ~~/kafka/bin

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic soon --from-beginning

옵션 :

--bootstrap-server : 카프카 호스트 정보를 입력

--topic : 토픽 이름

--from-beginning : 토픽의 처음부터 메시지를 가져오도록 추가

출력 :

hello

2. 자바 사용

프로듀서 : Producer

@Test
public void test(){
    Properties props = new Properties();

    // 브로커 리스트 정의        
    props.put("bootstrap.servers", "localhost:9092");

    // 메세지의 키와 값에 문자열을 사용할 예정이므로 내장된 StringSerializer를 지정 
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    // Properties 를 전달해 새 프로듀서를 생성함
    Producer<String, String> producer = new KafkaProducer<>(props);

    // ProducerRecord 를 생성하고, send() 메시지를 사용하여 soon 토픽으로 메시지를 전송함 
    producer.send(new ProducerRecord<String, String>("soon", "kafka java test"));
    producer.close();
}

컨슈머 : Consumer

@Test
public void test(){
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("group.id", "soon-consumer");
    props.put("enable.auto.commit", "true");
    props.put("auto.offset.reset", "latest");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Arrays.asList("soon"));

    try{
        while(true){
            ConsumerRecords<String, String> records = consumer.poll(100);
            for(ConsumerRecord<String, String> record : records){
                log.info("topic : {}, value : {}", record.topic(), record.value());
            }
        }
    }finally {
        consumer.close();
    }
}

3. 스프링 부트 사용

디펜던시 추가

compile 'org.apache.kafka:kafka-clients:2.2.0'
compile 'org.springframework.kafka:spring-kafka'

프로듀서 : KafkaTemplate

@EnableKafka
@Configuration
public class KafkaConfig {

    @Value(value = "${kafka.bootstrap}")
    private String bootstrapAddress;

    @Bean
    public Map<String, Object> producerConfig(){
        Map<String, Object> props = new HashMap<>();

        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        return props;
    }

    @Bean
    public ProducerFactory<String, String> producerFactory(){
        return new DefaultKafkaProducerFactory<>(producerConfig());
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate(){
        return new KafkaTemplate<String, String>(producerFactory());
    }

}

사용 :

@Value("${kafka.topic}")
private String topic;

kafkaTemplate.send(topic, data);

컨슈머 : ConcurrentKafkaListenerContainerFactory

설정

@Bean
public Map<String, Object> consumerConfig(){
    Map<String, Object> props = new HashMap<>();

    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

    return props;
}

@Bean
public ConsumerFactory<String, String> consumerFactory(){
    return new DefaultKafkaConsumerFactory<>(consumerConfig());
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(){
    ConcurrentKafkaListenerContainerFactory<String, String> factory
            = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    return factory;
}

사용 : @KafkaListener 사용

@KafkaListener(topics = "${kafka.topic}")
public void listen(@Payload String data){
    log.info("received data : {}", data);
}

'공부 > 기타' 카테고리의 다른 글

logrotate를 사용한 Tomcat 로그 관리  (0) 2019.07.10
[Kafka] 아파치 카프카 실행하기  (0) 2019.04.21
mysql 타임존 설정  (0) 2019.04.15
RabbitMQ 학습  (0) 2019.04.09
MAC에서 RabbitMQ 설치  (0) 2019.04.03