Почему Spring Cloud Stream?
Потому что в этом случае для замены в дальнейшем Apache Kafka на другой менеджер очереди (например, RabbitMQ) понадобится всего лишь:
- добавить соответствующие зависимости в файл pom.xml;
- скорректировать настройки в файле application.properties для новой системы сообщений.
Итак, приступим:
1. Добавим в pom.xml зависимости для Apache Kafka.
Далее мы будем просто переключать профили Spring для использования RabbitMQ или Apache Kafka.
2. Добавим в файл application.properties настройки очереди Apache Kafka и свои заголовки.
3. Создадим свой обработчик каналов сообщений.
Можно использовать имеющиеся в Spring Cloud Stream интерфейсы Source (output channel), Sink (input channel) или Processor (input & output channels), но полезнее будет сделать всё "ручками". Тем более, что в свой обработчик каналов мы можем добавить любое количество каналов.
5. Создадим производителя сообщений, который будет передавать в качестве сообщения наш объект Request.
6. Создадим слушателя очереди, который будет логировать сообщения, полученные из очереди.
Аннотация @EnableBinding указывает на то, с каким обработчиком каналов будет связан наш слушатель. @StreamListener - сообщения из какого канала обрабатывает этот метод.
На этом всё.
Потому что в этом случае для замены в дальнейшем Apache Kafka на другой менеджер очереди (например, RabbitMQ) понадобится всего лишь:
- добавить соответствующие зависимости в файл pom.xml;
- скорректировать настройки в файле application.properties для новой системы сообщений.
Итак, приступим:
1. Добавим в pom.xml зависимости для Apache Kafka.
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-kafka</artifactId> </dependency>
Далее мы будем просто переключать профили Spring для использования RabbitMQ или Apache Kafka.
2. Добавим в файл application.properties настройки очереди Apache Kafka и свои заголовки.
# Apache Kafka properties # general spring.cloud.stream.kafka.binder.brokers=localhost:9092 spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000 spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde # # outputChannel (out) spring.cloud.stream.bindings.paymentEventsChannel.producer.headerMode=headers spring.cloud.stream.bindings.paymentEventsChannel.destination=output # # custom headers spring.cloud.stream.kafka.binder.headers=SenderId,SenderName,Type,MessageId # # inputChannel (in) spring.cloud.stream.bindings.inputChannel.consumer.headerMode=headers spring.cloud.stream.bindings.inputChannel.destination=input spring.cloud.stream.bindings.inputChannel.group=queue
3. Создадим свой обработчик каналов сообщений.
Можно использовать имеющиеся в Spring Cloud Stream интерфейсы Source (output channel), Sink (input channel) или Processor (input & output channels), но полезнее будет сделать всё "ручками". Тем более, что в свой обработчик каналов мы можем добавить любое количество каналов.
@Slf4j @Component @EnableBinding(ChannelProcessor.class) public class MessageListener { @StreamListener(ChannelProcessor.INPUT) public void listen(String message) { log.info("MessageListener got message: {}.", message); } }
@Component public class MessagePublisher { private ChannelProcessor channelProcessor; @Autowired public MessagePublisher(ChannelProcessor channelProcessor) { this.channelProcessor = channelProcessor; } public void sendRequest(Request request) { channelProcessor.outputChannel().send(message(request)); } private static final <T> Message<T> message(T val) { return MessageBuilder.withPayload(val).build(); } }
6. Создадим слушателя очереди, который будет логировать сообщения, полученные из очереди.
@Slf4j @Component @EnableBinding(ChannelProcessor.class) public class MessageListener { @StreamListener(ChannelProcessor.INPUT) public void listen(Message<QueueMessage> msg) { QueueMessage message = msg.getPayload(); MessageHeaders headers = msg.getHeaders(); String messageType = (String) headers.get(MessageDefinitions.TYPE); final String token = extractToken(headers); final String clientIP = extractClientIP(headers); log.info("Got message: {}, client IP: {}, token: {}, message type: {}.", message.toString(), clientIP, token, messageType); } private String extractToken(MessageHeaders headers) { return (String) headers.get(MessageDefinitions.SENDER_ID); } private String extractClientIP(MessageHeaders headers) { return (String) headers.get(MessageDefinitions.SENDER_NAME); } }
Аннотация @EnableBinding указывает на то, с каким обработчиком каналов будет связан наш слушатель. @StreamListener - сообщения из какого канала обрабатывает этот метод.
На этом всё.
Комментариев нет:
Отправить комментарий