Kafka集群
Kafka作为新一代的消息系统,最初是由LinkedIn公司开发,之后开源成为Apache的顶级项目.
它与传统的消息系统相比,有以下不同:
- 它被设计为一个分布式系统,易于向外扩展;
- 它同时为发布和订阅提供高吞吐量;
- 它支持多订阅者,当失败时能自动平衡消费者;
- 它将消息持久化到磁盘,因此可用于批量消费,例如ETL,以及实时应用程序。
同时.Kafka提供了多种客户端的Driver.这点比阿里的RocketMQ
要好.而阿里的RocketMQ在性能和使用方面优于Kafka,不过只提供JAVA的客户端.
而我们系统中不光只有JAVA应用,因此,在我们的系统中,决定使用kafka作为我们的消息服务.
集群部署
- 启动Zookeeper服务
集群部署也是需要启用Zookeeper。 启动Kafka broker服务
在不同的机器上拷贝kafka.然后修改它的$KAFKA_HOME/config/server.properties
文件.
主要是修改以下几个地方:123456#服务的序号,这个需要在集群中保证唯一brokerid = 0#服务的端口,如果是一台机器上只启动一个kafka,可以不修改port = 1234#服务的数据存放位置,默认可以不修改log.dir = /tmp/kafka_log
然后调用 `./bin/kafka-server-start.sh conf/server.properties` 即可启动
然后就可以创建
Topic
了.bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 2 --partitions 2 --topic testTopic
其中
replication-factor
表示要在几个节点上存放数据.一般有几个节点就配置成几个就可以了.
partitions
就表示分片.一般配置成1就可以了.到此,kafka的集群就搭建完成了.然后就可以使用我们的代码来进行试验了.
Spring配置:
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980<?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"><bean id="jmsTemplate" class="com.sobey.jcg.kafka.util.KafkaTemplate"><constructor-arg value="127.0.0.1:8092,127.0.0.1:8093"/></bean><bean id="defaultThreadPool" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"><!-- 核心线程数,默认为1 --><property name="corePoolSize" value="100"/><!-- 最大线程数,默认为Integer.MAX_VALUE --><property name="maxPoolSize" value="900"/><!-- 队列最大长度,一般需要设置值>=notifyScheduledMainExecutor.maxNum;默认为Integer.MAX_VALUE --><property name="queueCapacity" value="50"/><!-- 线程池维护线程所允许的空闲时间,默认为60s --><property name="keepAliveSeconds" value="60"/><!-- 线程池对拒绝任务(无线程可用)的处理策略,目前只支持AbortPolicy、CallerRunsPolicy;默认为后者 --><property name="rejectedExecutionHandler"><!-- AbortPolicy:直接抛出java.util.concurrent.RejectedExecutionException异常 --><!-- CallerRunsPolicy:主线程直接执行该任务,执行完之后尝试添加下一个任务到线程池中,可以有效降低向线程池内添加任务的速度 --><!-- DiscardOldestPolicy:抛弃旧的任务、暂不支持;会导致被丢弃的任务无法再次被执行 --><!-- DiscardPolicy:抛弃当前任务、暂不支持;会导致被丢弃的任务无法再次被执行 --><bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" /></property></bean><bean id="customConsumerMessageListener" class="com.xxx.jcg.kafka.util.KafkaMessageListenerAdapter" scope="prototype"><constructor-arg><bean class="com.xxx.jcg.kafka.test.CustomConsumer"/></constructor-arg></bean><!--ZK的链接 --><bean id="zookeeperConnect" class="com.sobey.jcg.kafka.util.ZookeeperConnect"><property name="zkConnect" value="127.0.0.1:2181"/><property name="zkConnectionTimeout" value="6000"/><property name="zkSessionTimeout" value="400" /><property name="zkSyncTime" value="200"/></bean><bean class="com.sobey.jcg.kafka.util.KafkaMessageListenerContainer"><constructor-arg ref="zookeeperConnect"/><constructor-arg ref="defaultThreadPool"/><property name="listener" ref="customConsumerMessageListener"/><property name="topic" value="kafkatopic"/></bean><bean class="com.sobey.jcg.kafka.util.KafkaMessageListenerContainer"><constructor-arg ref="zookeeperConnect"/><constructor-arg ref="defaultThreadPool"/><property name="listener" ref="customConsumerMessageListener"/><property name="topic" value="clusterTest"/></bean></beans>```测试代码:```javapublic class JMSSenderTest {public static void main(String[] args) throws Exception {CustomSpringContextUtil.getContext("applicationContext.xml").getBean("jmsTemplate");UserInfo userInfo = new UserInfo("aaa",17,true);JMSSender.getInstance().sendMessage(userInfo,"clusterTest");JMSSender.getInstance().sendMessage("这个是String","clusterTest");HashMap<Object,Object> objectObjectMap = new HashMap<Object, Object>();objectObjectMap.put("name","bbbb");objectObjectMap.put("age",20);objectObjectMap.put("gender",true);JMSSender.getInstance().sendMessage(objectObjectMap,"clusterTest");}}
如果正常的话,就会发送消息到kafka中. 并且程序能有消费的记录.