pulsar在java中的使用_parcel java

SpringBoot整合分布式消息平台Pulsar

?作为优秀的消息流平台,Pulsar 的使用越来越多,这篇文章讲解 Pulsar 的 Java 客户端。


部署 Pulsar


Pulsar 的部署方式主要有 3 种,本地安装二进制文件、docker 部署、在 Kubernetes 上部署。


本文采用 docker 部署一个单节点的 Pulsar 集群。实验环境是 2 核 CPU 和 4G 内存。


部署命令如下:


docker run -it -p :  -p : --mount source=pulsardata,target=/pulsar/data --mount source=pulsarconf,target=/pulsar/conf apachepulsar/pulsar: bin/pulsar standalone



安装过程可能会出现下面的错误:


unknown flag: --mount
See 'docker run --help'.



这是因为 docker 版本低,不支持 mount 参数,把 docker 版本升级到 以上就可以了。


部署过程中可能会因为网络的原因失败,多试几次就可以成功了。如果看到下面的日志,就说明启动成功了。


-08T22::, [main] INFO  org.apache.pulsar.broker.PulsarService - messaging service is ready, bootstrap service port = , broker url= pulsar://localhost:, cluster=standalone



本地单节点集群启动后,会创建一个 namespace,名字叫 public/default


Pulsar 客户端


目前 Pulsar 支持多种语言的客户端,包括:


Java 客户端


Go 客户端


Python 客户端


C++ 客户端


Node.js 客户端


WebSocket 客户端


C# 客户端


SpringBoot 配置


使用 SpringBoot 整合 Pulsar 客户端,首先引入 Pulsar 客户端依赖,代码如下:



    org.apache.pulsar
    pulsar-client
    



然后在 properties 文件中添加配置:


# Pulsar 地址
pulsar.url=pulsar://:
# topic
pulsar.topic=testTopic
# consumer group
pulsar.subscription=topicGroup



创建 Client


创建客户端非常简单,代码如下:


client = PulsarClient.builder()
                .serviceUrl(url)
                .build();



上面的 url 就是 properties 文件中定义的 pulsar.url 。


创建 Client 时,即使集群没有启成功,程序也不会报错,因为这时还没有真正地去连接集群。


创建 Producer


producer = client.newProducer()
                .topic(topic)
                .compressionType(CompressionType.LZ4)
                .sendTimeout(0, TimeUnit.SECONDS)
                .enableBatching(true)
                .batchingMaxPublishDelay(, TimeUnit.MILLISECONDS)
                .batchingMaxMessages)
                .maxPendingMessages)
                .blockIfQueueFull(true)
                .roundRobinRouterBatchingPartitionSwitchFrequency()
                .batcherBuilder(BatcherBuilder.DEFAULT)
                .create();



创建 Producer,会真正的连接集群,这时如果集群有问题,就会报连接错误。


下面解释一下创建 Producer 的参数:


topic:Producer 要写入的 topic。


compressionType:压缩策略,目前支持 4 种策略 (NONE、LZ4、ZLIB、ZSTD),从 Pulsar2.3 开始,只有 Consumer 的版本在 以上,这个策略才会生效。


sendTimeout:超时时间,如果 Producer 在超时时间为收到 ACK,会进行重新发送。


enableBatching:是否开启消息批量处理,这里默认 true,这个参数只有在异步发送 (sendAsync) 时才能生效,选择同步发送会失效。


batchingMaxPublishDelay:批量发送消息的时间段,这里定义的是 10ms,需要注意的是,设置了批量时间,就不会受消息数量的影响。批量发送会把要发送的批量消息放在一个网络包里发送出去,减少网络 IO 次数,大大提高网卡的发送效率。


batchingMaxMessages:批量发送消息的最大数量。


maxPendingMessages:等待从 broker 接收 ACK 的消息队列最大长度。如果这个队列满了,producer 所有的 sendAsync 和 send 都会失败,除非设置了 blockIfQueueFull 值是 true。


blockIfQueueFull:Producer 发送消息时会把消息先放入本地 Queue 缓存,如果缓存满了,就会阻塞消息发送。



roundRobinRouterBatchingPartition-SwitchFrequency
:如果发送消息时没有指定 key,那默认采用 round robin 的方式发送消息,使用 round robin 的方式,切换 partition 的周期是 (frequency * batchingMaxPublishDelay)。


创建 Consumer


Pulsar 的消费模型如下图:


?



从图中可以看到,Consumer 要绑定一个 subscription 才能进行消费。


consumer = client.newConsumer()
        .topic(topic)
        .subscriptionName(subscription)
        .subscriptionType(SubscriptionType.Shared)
        .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
        .negativeAckRedeliveryDelay(, TimeUnit.SECONDS)
        .receiverQueueSize)
        .subscribe();



下面解释一下创建 Consumer 的参数:


topic:Consumer 要订阅的 topic。


subscriptionName:consumer 要关联的 subscription 名字。


subscriptionType:订阅类型,Pulsar 支持四种类型订阅:


Exclusive:独占模式,同一个 Topic 只能有一个消费者,如果多个消费者,就会出错。


Failover:灾备模式,同一个 Topic 可以有多个消费者,但是只能有一个消费者消费,其他消费者作为故障转移备用,如果当前消费者出了故障,就从备用消费者中选择一个进行消费。如下图:


?



Shared:共享模式,同一个 Topic 可以由多个消费者订阅和消费。消息通过 round robin 轮询机制分发给不同的消费者,并且每个消息仅会被分发给一个消费者。当消费者断开,如果发送给它消息没有被消费,这些消息会被重新分发给其它存活的消费者。如下图:


?



Key_Shared:消息和消费者都会绑定一个key,消息只会发送给绑定同一个key的消费者。如果有新消费者建立连接或者有消费者断开连接,就需要更新一些消息的 key。跟 Shared 模式相比,Key_Shared 的好处是既可以让消费者并发地消费消息,又能保证同一Key下的消息顺序。如下图:


?




subscriptionInitialPosition
:创建新的 subscription 时从哪里开始消费,有两个选项:


Latest:从最新的消息开始消费


Earliest:从最早的消息开始消费




negativeAckRedeliveryDelay
:消费失败后间隔多久 broker 重新发送。


receiverQueueSize:在调用 receive 方法之前,最多能累积多少条消息。可以设置为 0,这样每次只从 broker 拉取一条消息。在 Shared 模式下,receiverQueueSize 设置为 0,可以防止批量消息多发给一个 Consumer 而导致其他 Consumer 空闲。


Consumer 接收消息有四种方式:同步单条、同步批量、异步单条和异步批量,代码如下:


Message message = consumer.receive()
CompletableFuture message = consumer.receiveAsync();
Messages message = consumer.batchReceive();
CompletableFuture message = consumer.batchReceiveAsync();



对于批量接收,也可以设置批量接收的策略,代码如下:


consumer = client.newConsumer()
    .topic(topic)
    .subscriptionName(subscription)
        .batchReceivePolicy(BatchReceivePolicy.builder()
        .maxNumMessages)
        .maxNumBytes * )
        .timeout, TimeUnit.MILLISECONDS)
        .build())
    .subscribe();



代码中的参数说明如下:


maxNumMessages:批量接收的最大消息数量。


maxNumBytes:批量接收消息的大小,这里是 1MB。


测试


首先编写 Producer 发送消息的代码,如下:


public void sendMsg(String key, String data) {
    CompletableFuture future = producer.newMessage()
        .key(key)
        .value(data.getBytes()).sendAsync();
    future.handle((v, ex) -> {
        if (ex == null) {
            logger.info("发送消息成功, key:{}, msg: {}", key, data);
        } else {
            logger.error("发送消息失败, key:{}, msg: {}", key, data);
        }
        return null;
    });
    future.join();
    logger.info("发送消息完成, key:{}, msg: {}", key, data);
}



然后编写一个 Consumer 消费消息的代码,如下:


public void start() throws Exception{
    while (true) {
        Message message = consumer.receive();
        String key = message.getKey();
        String data = new String(message.getData());
        String topic = message.getTopicName();
        if (StringUtils.isNotEmpty(data)) {
            try{
                logger.info("收到消息, topic:{}, key:{}, data:{}", topic, key, data);
            }catch(Exception e){
                logger.error("接收消息异常,topic:{}, key:{}, data:{}", topic, key, data, e);
            }
        }
        consumer.acknowledge(message);
    }
}



最后编写一个 Controller 类,调用 Producer 发送消息,代码如下:


@RequestMapping("/send")
@ResponseBody
public String send(@RequestParam String key, @RequestParam String data) {
    logger.info("收到消息发送请求, key:{}, value:{}", key, data);
    pulsarProducer.sendMsg(key, data);
    return "success";
}



调用 Producer 发送一条消息,key=key1,data=data1,具体操作为在浏览器中输入下面的 url 后回车:


http://:/pulsar/send?key=key1&data=data1



可以看到控制台输出下面日志:


 ::, [pulsar-client-io-] [INFO] boot.pulsar.PulsarProducer - 发送消息成功, key:key1, msg: data1
 ::, [http-nio--exec-1] [INFO] boot.pulsar.PulsarProducer - 发送消息完成, key:key1, msg: data1
 ::, [Thread-] [INFO] boot.pulsar.PulsarConsumer - 收到消息, topic:persistent://public/default/testTopic, key:key1, data:data1
 ::, [pulsar-timer-] [INFO] org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [testTopic] [topicGroup] [7def6] Prefetched messages: 0 --- Consume throughput received:  msgs/s ---  Mbit/s --- Ack sent rate:  ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks:  ::, [pulsar-timer-] [INFO] org.apache.pulsar.client.impl.ProducerStatsRecorderImpl - [testTopic] [standalone-] Pending messages: 0 --- Publish throughput:  msg/s ---  Mbit/s --- Latency: med:  ms - 95pct:  ms - 99pct:  ms - .9pct:  ms - max:  ms --- Ack received rate:  ack/s --- Failed messages: 0



从日志中看到,这里使用的 namespace 就是创建集群时生成的public/default。


总结


从 SpringBoot 整合 Java 客户端使用来看,Pulsar 的 api 是非常友好的,使用起来方便简洁。Consumer 的使用需要考虑多一些,需要考虑到批量、异步以及订阅类型。


?




?

原文链接:,转发请注明来源!