티스토리 뷰
spring boot로 kafka를 간단하게 구현을 해보자. kafka가 이미 설치된 가정하에 진행하겠다. 이전글에 보면 kafka 설치 및 간단히 콘솔에서 실행하는 방법을 설명한 바 있다. 참조하도록 하자. 먼저 프로젝트를 생성하자. 인텔리j, maven기반으로 설명하겠다.Spring initializr를 선택해서 프로젝트를 생성하자.
디펜던시를 아래와 같이 추가해주자. Web -> Spring Web Starter와 Messaging -> Spring for Apache Kafka Streams, Spring for Apache Kafka
이제 kafka의 topic을 생성하는 KafkaTopicConfig 파일을 생성하고 KafkaAdmin 빈을 추가하자. KafkaAdmin 빈은 NewTopic 유형의 모든 bean에 대한 topic을 자동으로 추가해준다.
@Configuration
public class KafkaTopicConfig {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
@Value(value = "${message.topic.name}")
private String topicName;
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
return new KafkaAdmin(configs);
}
@Bean
public NewTopic topic1() {
return new NewTopic(topicName, 1, (short) 1);
}
}
메시지를 생성하려면 먼저 Kafka Producer 인스턴스를 생성하기위한 전략을 설정하는 ProducerFactory를 구성해야한다.
KafkaProducerConfig 파일을 생성하자. 그런 다음 인스턴스를 래핑하고 Kafka topic에 메시지를 보내기위한 방법을 제공하는 KafkaTemplate을 빈으로 등록하자. Producer 인스턴스는 스레드로부터 안전하므로 응용 프로그램 컨텍스트 전체에서 단일 인스턴스를 사용하면 성능이 향상됩니다. 따라서 KakfaTemplate 인스턴스는 스레드로부터 안전하므로 하나의 인스턴스 만 사용하는 것이 좋다.
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
KafkaTemplate 클래스를 사용하여 메시지를 보내기 위한 KafkaProducer를 만들자.
@Value(value = "${message.topic.name}")
private String topicName;
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String message) {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topicName, message);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> result) {
LOGGER.info("Sent message=[" + message + "] with offset=[" + result.getRecordMetadata().offset() + "]");
}
@Override
public void onFailure(Throwable ex) {
LOGGER.info("Unable to send message=[" + message + "] due to : " + ex.getMessage());
}
});
}
send API는 ListenableFuture 객체를 반환한다. 보낸 메시지에 대한 결과를 얻기위해 ListenableFuture 객체의 get API를 호출 한다.
결과를 비동기 적으로 처리하여 후속 메시지가 이전 메시지의 결과를 기다리지 않도록 콜백을 통해 작업을 수행 하도록 한다.
이제 메시지를 소비하는 Consumer를 설정하자. 소비하려면 ConsumerFactory 및 KafkaListenerContainerFactory를 구성해야한다.
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
@Value(value = "${kafka.groupId}")
private String groupId;
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory
= new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
소비자는 @KafkaListener 어노테이션을 사용하여 구성 할 수 있다.
@KafkaListener(topics = "${message.topic.name}", groupId = "${kafka.groupId}")
public void listen(String message) {
LOGGER.info("Received Messasge in group : " + message);
}
필자는 api 호출을 통해 메세지를 보내서 Producer가 메세지를 잘보내고 Cunsumer가 자 처리하는지 테스트를 하기위해 controller를 만들었다.
@RestController
@RequiredArgsConstructor
@RequestMapping(value = "/kafka")
public class KafkaController {
@Autowired
private KafkaProducer messageProducer;
@GetMapping(value = "/publish")
public void sendMessageToKafkaTopic(@RequestParam("message") String message) {
messageProducer.sendMessage(message);
log.info("message send : {}", message);
}
}
이제 준비는 끝났다. 테스트를 하자. zookeeper와 kafka를 차례로 실행하자.
zookeeper 실행
bin/zookeeper-server-start.sh config/zookeeper.properties
kafka 실행
bin/kafka-server-start.sh config/server.properties
서버를 실행하고 localhost:8080/kafka/publish?message=kafka_test_message 요청을 하자. 아래와 같이 Producer, Cunsumer 로그가 잘 찍히는걸 보니 잘 처리가 된걸 확인 할 수 있다.
2019-06-28 17:09:26.286 INFO 73502 --- [nio-8080-exec-1] me.seungwoo.KafkaController : message send : kafka_test_message
2019-06-28 17:09:26.307 INFO 73502 --- [ad | producer-1] me.seungwoo.producer.KafkaProducer : Sent message=[kafka_test_message] with offset=[1]
2019-06-28 17:09:26.316 INFO 73502 --- [ntainer#0-0-C-1] me.seungwoo.producer.KafkaProducer : Received Messasge in group : kafka_test_message
소스는 여기에 있다.
'Spring-Boot' 카테고리의 다른 글
spring boot MessageSource 다국어처리 (0) | 2019.02.08 |
---|---|
spring boot mybatis 연동 (0) | 2019.02.06 |
spring boot Transaction(@Transactional) (0) | 2018.01.15 |
spring xml을 java config로 설정하기 (0) | 2017.08.16 |