博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
kafka的安装和使用(详细版)
阅读量:2444 次
发布时间:2019-05-10

本文共 5950 字,大约阅读时间需要 19 分钟。

原创地址: https://www.cnblogs.com/lilixin/p/5775877.html

Kafka安装与使用

下载地址:

安装以及启动kafka

步骤1:安装kafka

$ tar -xzf kafka_2.10-0.8.1.1.tgz$ cd kafka_2.10-0.8.1.1.tgz

 

步骤2:配置server.properties

 配置zookeeper(假设您已经安装了zookeeper,如果没有安装,请再网上搜索安装方法)

进入kafka安装工程根目录编辑 

vim config/server.properties

修改属性 zookeeper.connect=ip:2181,ip2: 2181

 

步骤3:server.properties配置说明

kafka最为重要三个配置依次为:broker.id、log.dir、zookeeper.connect

kafka server端config/server.properties参数说明和解释如下:

(参考配置说明地址:http://blog.csdn.net/lizhitao/article/details/25667831)

 

#实际使用案例 这里211上面的kafka 配置文件
broker.id=1port=9092host.name=192.168.1.211num.network.threads=2num.io.threads=8socket.send.buffer.bytes=1048576socket.receive.buffer.bytes=1048576socket.request.max.bytes=104857600log.dirs=/tmp/kafka-logsnum.partitions=2log.retention.hours=168log.segment.bytes=536870912log.retention.check.interval.ms=60000log.cleaner.enable=falsezookeeper.connect=192.168.1.213:2181,192.168.1.216:2181,192.168.1.217:2181zookeeper.connection.timeout.ms=1000000#kafka实际使用案例 210服务器kafka配置broker.id=2port=9092host.name=192.168.1.210num.network.threads=2num.io.threads=8socket.send.buffer.bytes=1048576socket.receive.buffer.bytes=1048576socket.request.max.bytes=104857600log.dirs=/tmp/kafka-logsnum.partitions=2log.retention.hours=168log.segment.bytes=536870912log.retention.check.interval.ms=60000log.cleaner.enable=falsezookeeper.connect=192.168.1.213:2181,192.168.1.216:2181,192.168.1.217:2181zookeeper.connection.timeout.ms=1000000

 

步骤4: 启动kafka

(先启动zookeeper $:  bin/zkServer.sh start config/zookeeper.properties &)

cd kafka-0.8.1

$ bin/kafka-server-start.sh -daemon config/server.properties &

 (实验时,需要启动至少两个broker   bin/kafka-server-start.sh -daemon config/server-1.properties &) 

步骤5:创建topic

$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

步骤6:验证topic是否创建成功

$ bin/kafka-topics.sh --list --zookeeper localhost:2181

localhost为zookeeper地址 

topic描述:

bin/kafka-topics.sh --describe --zookeeper 192.168.1.8:2181 --topic test

 

//启动报错Unrecognized VM option '+UseCompressedOops'查看 bin/kafka-run-class.sh找到if [ -z "$KAFKA_JVM_PERFORMANCE_OPTS" ]; then  KAFKA_JVM_PERFORMANCE_OPTS="-server  -XX:+UseCompressedOops -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark -XX:+DisableExplicitGC -Djava.awt.headless=true"fi去掉-XX:+UseCompressedOops

 

启动报错 Could not reserve enough space for object heap原因及解决办法:查看kafka-server-start.sh配置文件,发现有heap设置信息:KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"    更改这里的内存为256(因为测试机内存总共才1G ,所以报错)

 

 

 

步骤7:发送消息

发送一些消息验证,在console模式下,启动producer

$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

(此处localhost改为本机ip,否则报错,I don’t  know why)

 消息案例: 

{"price":"100000","userId":14615501351480021,"payType":3,"code":"AFD3B8","payTime":{"time":1457330791333,"minutes":6,"seconds":31,"hours":14,"month":2,"year":116,"timezoneOffset":-480,"day":1,"date":7},"orderId":12222096,"goodsName":"会员"}

 

 步骤8:启动一个consumer

$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

 


 


 


 


 

删除topic,慎用,只会删除zookeeper中的元数据,消息文件须手动删除

bin/kafka-run-class.sh kafka.admin.DeleteTopicCommand --topic test --zookeeper 192.168.197.170:2181 ,192.168.197.171:2181

    


 


 


 


 

配置kafka集群模式,需要由多个broker组成

 

和单机环境一样,只是需要修改下broker 的配置文件而已。

1、将单机版的kafka 目录复制到其他几台电脑上。

2、修改每台电脑上的kafka 目录下的server.properties 文件。

broker.id=1//这个参数在kafka 的broker 集群中必须唯一,且为正整数。

3、启动每台电脑上的kafka 即可。

 

本机配置伪分布式

 

首先为每个节点编写配置文件:
> cp config/server.properties config/server-1.properties> cp config/server.properties config/server-2.properties

 

在拷贝出的新文件中添加以下参数:
config/server-1.properties:
broker.id=1port=9093log.dir=/tmp/kafka-logs-1

config/server-2.properties:

broker.id=2port=9094log.dir=/tmp/kafka-logs-2

 

现在启动另外两个节点:
> bin/kafka-server-start.sh config/server-1.properties &...> bin/kafka-server-start.sh config/server-2.properties &...

 

创建一个拥有3个副本的topic:
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

 

运行“"describe topics”命令知道每个节点的信息
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topicTopic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0leader:负责处理消息的读和写,leader是从所有节点中随机选择的.replicas:列出了所有的副本节点,不管节点是否在服务中.isr:是正在服务中的节点.

 

  




搭建Kafka开发环境

1 在pom.xml中引入kafka依赖jar包
org.apache.kafka
kafka_2.9.2
${kafka.version}
zookeeper
org.apache.zookeeper
zkclient
com.101tec
slf4j-api
org.slf4j

 

2.属性文件 kafka.properties
#zookeeper.connect=192.168.1.8:2181,192.168.1.13:2181,192.168.1.16:2181#zookeeper.connect=zkserver1.vko.cn:2181,zkserver2.vko.cn:2181,zkserver3.vko.cn:2181zookeeper.connect=192.168.1.179:2181metadata.broker.list=192.168.1.179:9092#metadata.broker.list=kafka.server1.vko.cn:9092,kafka.server2.vko.cn:9092 #zookeeper.connect.timeout=15000#zookeeper.session.timeout.ms=15000#zookeeper.sync.time.ms=20000#auto.commit.interval.ms=20000#auto.offset.reset=smallest#serializer.class=kafka.serializer.StringEncoder#producer.type=async#queue.buffering.max.ms=6000 group.id=llxkafka.sellstat.topics=llx

 

在spring配置文件中引入此properties文件
classpath:kafka.properties
classpath:kafka.properties

 

 
3.定义收信人

 

4. spring中定义一个消息处理器(需要实现vkoConsumer)

 

5.消息生产者项目引入producer

 

你可能感兴趣的文章
如何格式化您的WhatsApp消息
查看>>
pixel2pixel_Pixel 2的视觉核心是什么?
查看>>
更改用户账户设置自动更改_您应该更改的5个SimpliSafe设置
查看>>
excel中转换为数值_如何在Microsoft Excel中转换货币
查看>>
netflix怎么上_如何在Netflix上隐藏电视节目和电影
查看>>
opera_从Opera快速拨号页上删除混乱
查看>>
apple pencil_如何在iPad Pro的Apple Pencil上双击动作
查看>>
如何在PowerPoint中将自定义模板设置为默认模板
查看>>
linux使用命令重命名_如何在Linux上使用重命名命令
查看>>
xcloud下载_Project xCloud是Microsoft在流Xbox游戏上的赌博
查看>>
gpu驱动程序_如何从错误的GPU驱动程序更新中恢复
查看>>
esp now_Google带回Google Now(内部)排序助手
查看>>
如何防止视频在Chrome中自动播放
查看>>
如何使用Synology NAS下载文件(并避免在夜间开启计算机)
查看>>
使用批处理脚本上传ftp_通过批处理脚本将文件上传到FTP站点
查看>>
linux下如何更改主机名_如何在不重新启动的情况下更改Linux主机名
查看>>
pxe网络启动引导程序_如何使用PXE设置网络可引导实用程序光盘
查看>>
凌乱的yyy_如何清理凌乱的Internet Explorer上下文菜单
查看>>
Laravel Eloquent:API资源
查看>>
在React中使用Font Awesome 5
查看>>