четверг, 1 августа 2019 г.

Учёба

Пару недель назад начал проходить курс React + Redux - Профессиональная Разработка .
Прошёл уже 120 уроков из 149, выполнил 3 учебных проекта. Осталось немного и можно будет пилить какое-нибудь своё приложение на React, чтоб не потерять приобретённые навыки.
Прошёл курс по тестированию web UI: End to End Testing with Google's Puppeteer and Jest
Учебные проекты можно посмотреть здесь, здесь и здесь.
Скриншоты того, что получилось:





Строим очередь сообщений на Apache Kafka с помощью Spring Cloud Stream

Почему Spring Cloud Stream?
Потому что в этом случае для замены в дальнейшем Apache Kafka на другой менеджер очереди (например, RabbitMQ) понадобится всего лишь:
- добавить соответствующие зависимости в файл pom.xml;
- скорректировать настройки в файле application.properties для новой системы сообщений.

Итак, приступим:

1. Добавим в pom.xml зависимости для Apache Kafka.

<dependency>
 <groupId>org.springframework.cloud</groupId>
 <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
</dependency>
<dependency>
 <groupId>org.springframework.cloud</groupId>
 <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

Далее мы будем просто переключать профили Spring для использования RabbitMQ или Apache Kafka.

2. Добавим в файл application.properties настройки очереди Apache Kafka и свои заголовки.

# Apache Kafka properties
# general
spring.cloud.stream.kafka.binder.brokers=localhost:9092
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
#
# outputChannel (out)
spring.cloud.stream.bindings.paymentEventsChannel.producer.headerMode=headers
spring.cloud.stream.bindings.paymentEventsChannel.destination=output
#
# custom headers
spring.cloud.stream.kafka.binder.headers=SenderId,SenderName,Type,MessageId
#
# inputChannel (in)
spring.cloud.stream.bindings.inputChannel.consumer.headerMode=headers
spring.cloud.stream.bindings.inputChannel.destination=input
spring.cloud.stream.bindings.inputChannel.group=queue

3. Создадим свой обработчик каналов сообщений.
Можно использовать имеющиеся в Spring Cloud Stream интерфейсы Source (output channel), Sink (input channel) или Processor (input & output channels), но полезнее будет сделать всё "ручками". Тем более, что в свой обработчик каналов мы можем добавить любое количество каналов.

@Slf4j
@Component
@EnableBinding(ChannelProcessor.class)
public class MessageListener {

    @StreamListener(ChannelProcessor.INPUT)
    public void listen(String message) {
        log.info("MessageListener got message: {}.", message);
    }
}
5. Создадим производителя сообщений, который будет передавать в качестве сообщения наш объект Request.

@Component
public class MessagePublisher {

    private ChannelProcessor channelProcessor;

    @Autowired
    public MessagePublisher(ChannelProcessor channelProcessor) {
        this.channelProcessor = channelProcessor;
    }

    public void sendRequest(Request request) {
        channelProcessor.outputChannel().send(message(request));
    }

    private static final <T> Message<T> message(T val) {
        return MessageBuilder.withPayload(val).build();
    }
}

6. Создадим слушателя очереди, который будет логировать сообщения, полученные из очереди.

@Slf4j
@Component
@EnableBinding(ChannelProcessor.class)
public class MessageListener {

    @StreamListener(ChannelProcessor.INPUT)
    public void listen(Message<QueueMessage> msg) {

        QueueMessage message = msg.getPayload();
        MessageHeaders headers = msg.getHeaders();
        String messageType = (String) headers.get(MessageDefinitions.TYPE);
        final String token = extractToken(headers);
        final String clientIP = extractClientIP(headers);
        log.info("Got message: {}, client IP: {}, token: {}, message type: {}.",
                message.toString(), clientIP, token, messageType);
    }

    private String extractToken(MessageHeaders headers) {
        return (String) headers.get(MessageDefinitions.SENDER_ID);
    }

    private String extractClientIP(MessageHeaders headers) {
        return (String) headers.get(MessageDefinitions.SENDER_NAME);
    }
}

Аннотация @EnableBinding указывает на то, с каким обработчиком каналов будет связан наш слушатель. @StreamListener - сообщения из какого канала обрабатывает этот метод.

На этом всё.