카프카 프로듀서/컨슈머
1. 명령어 사용
프로듀서 : kafka-console-producer.sh
경로 : ~~/kafka/bin
./kafka-console-producer.sh --broker-list localhost:9092 --topic soon
옵션 :
--broker-list
- 카프카 클러스터 내 모든 브로커 리스트를 입력
- 브로커의 호스트명:포트번호, 브로커의 호스트명:포트번호
--topic
- 메시지를 보내고자 하는 토픽 이름을 명시
출력 : 입력을 기다리는 창이 나타나며, 메시지 입력 후 엔터를 실행하면 전달됨
> hello
>
Ctrl + C 입력시 빠져나올 수 있음
컨슈머 : kafka-console-consumer.sh
경로 : ~~/kafka/bin
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic soon --from-beginning
옵션 :
--bootstrap-server : 카프카 호스트 정보를 입력
--topic : 토픽 이름
--from-beginning : 토픽의 처음부터 메시지를 가져오도록 추가
출력 :
hello
2. 자바 사용
프로듀서 : Producer
@Test
public void test(){
Properties props = new Properties();
// 브로커 리스트 정의
props.put("bootstrap.servers", "localhost:9092");
// 메세지의 키와 값에 문자열을 사용할 예정이므로 내장된 StringSerializer를 지정
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// Properties 를 전달해 새 프로듀서를 생성함
Producer<String, String> producer = new KafkaProducer<>(props);
// ProducerRecord 를 생성하고, send() 메시지를 사용하여 soon 토픽으로 메시지를 전송함
producer.send(new ProducerRecord<String, String>("soon", "kafka java test"));
producer.close();
}
컨슈머 : Consumer
@Test
public void test(){
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "soon-consumer");
props.put("enable.auto.commit", "true");
props.put("auto.offset.reset", "latest");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("soon"));
try{
while(true){
ConsumerRecords<String, String> records = consumer.poll(100);
for(ConsumerRecord<String, String> record : records){
log.info("topic : {}, value : {}", record.topic(), record.value());
}
}
}finally {
consumer.close();
}
}
3. 스프링 부트 사용
디펜던시 추가
compile 'org.apache.kafka:kafka-clients:2.2.0'
compile 'org.springframework.kafka:spring-kafka'
프로듀서 : KafkaTemplate
@EnableKafka
@Configuration
public class KafkaConfig {
@Value(value = "${kafka.bootstrap}")
private String bootstrapAddress;
@Bean
public Map<String, Object> producerConfig(){
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, String> producerFactory(){
return new DefaultKafkaProducerFactory<>(producerConfig());
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate(){
return new KafkaTemplate<String, String>(producerFactory());
}
}
사용 :
@Value("${kafka.topic}")
private String topic;
kafkaTemplate.send(topic, data);
컨슈머 : ConcurrentKafkaListenerContainerFactory
설정
@Bean
public Map<String, Object> consumerConfig(){
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory(){
return new DefaultKafkaConsumerFactory<>(consumerConfig());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(){
ConcurrentKafkaListenerContainerFactory<String, String> factory
= new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
사용 : @KafkaListener 사용
@KafkaListener(topics = "${kafka.topic}")
public void listen(@Payload String data){
log.info("received data : {}", data);
}
'공부 > 기타' 카테고리의 다른 글
logrotate를 사용한 Tomcat 로그 관리 (0) | 2019.07.10 |
---|---|
[Kafka] 아파치 카프카 실행하기 (0) | 2019.04.21 |
mysql 타임존 설정 (0) | 2019.04.15 |
RabbitMQ 학습 (0) | 2019.04.09 |
MAC에서 RabbitMQ 설치 (0) | 2019.04.03 |