kafka配置信息

  1. 介绍
    1. Kafka的特性:
    2. Kafka的使用场景:
  2. 集群配置
    1. 单机多breoker
    2. 多机多broker
    3. 配置文件
    4. Kafka的特性:
  3. springboot整合

介绍

  Kafka是分布式发布-订阅消息系统,最初由LinkedIn公司开发,之后成为之后成为Apache基金会的一部分,由Scala和Java编写。Kafka是一种快速、可扩展的、设计内在就是分布式的,分区的和可复制的提交日志服务。

Kafka的特性:

  • 高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒,每个topic可以分多个partition, consumer group 对partition进行consume操作。
  • 可扩展性:kafka集群支持热扩展
  • 持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失
  • 容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)
  • 高并发:支持数千个客户端同时读写

Kafka的使用场景:

  • 日志收集:一个公司可以用Kafka可以收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、Hbase、Solr等。
  • 消息系统:解耦和生产者和消费者、缓存消息等。
  • 用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到hadoop、数据仓库中做离线分析和挖掘。
  • 运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。
  • 流式处理:比如spark streaming和storm
  • 事件源

集群配置

Kafka支持两种模式的集群搭建:

  1. 单机多broker集群配置;
  2. 多机多broker集群配置。

单机多breoker

利用单节点部署多个broker。不同的broker不同的id,监听端口以及日志目录,如:

将配置文件复制两份

cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties 

修改配置文件信息

vim config/server-1.properties
#修改内容
broker.id=2
listeners=PLAINTEXT://your.host.name:9093
log.dirs=/data/kafka-logs-1


vim config/server-2.properties
#修改内容
broker.id=3
listeners=PLAINTEXT://your.host.name:9094
log.dirs=/data/kafka-logs-2

启动多个kafka服务

in/kafka-server-start.sh config/server-1.properties 

bin/kafka-server-start.sh config/server-2.properties 

最后按照上面方法产生和消费信息。

多机多broker

分别在多个节点按上述方式安装Kafka,配置启动多个Zookeeper 实例。如:192.168.18.130、192.168.18.131、192.168.18.132三台机器

分别配置多个机器上的Kafka服务 设置不同的broke id,zookeeper.connect设置如下:

zookeeper.connect=192.168.18.130:2181,192.168.18.131:2181,192.168.18.132:2181

配置文件

broker.id=0     #当前机器在集群中的唯一标识,和zookeeper的myid性质一样
port=9092      #当前kafka对外提供服务的端口默认是9092
host.name=10.181.65.180       #这个参数默认是关闭的,在0.8.1有个bug,DNS解析问题,失败率的问题。
num.network.threads=3         #这个是borker进行网络处理的线程数
num.io.threads=8              #这个是borker进行I/O处理的线程数
log.dirs=/opt/kafka/kafkalogs/       #消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录,如果配置多个目录,新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个
socket.send.buffer.bytes=102400      #发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能
socket.receive.buffer.bytes=102400   #kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘
socket.request.max.bytes=104857600   #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小
num.partitions=1                     #默认的分区数,一个topic默认1个分区数
log.retention.hours=168              #默认消息的最大持久化时间,168小时,7天
message.max.byte=5242880             #消息保存的最大值5M
default.replication.factor=2         #kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务
replica.fetch.max.bytes=5242880      #取消息的最大直接数
log.segment.bytes=1073741824         #这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件
log.retention.check.interval.ms=300000         #每隔300000毫秒去检查上面配置的log失效时间(log.retention.hours=168 ),到目录查看是否有过期的消息如果有,删除
log.cleaner.enable=false                       #是否启用log压缩,一般不用启用,启用的话可以提高性能
zookeeper.connect=10.181.65.180:2181,0.181.65.181:2181,0.181.65.182:2181             #设置zookeeper的连接端口

Kafka的特性:

  • 高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒,每个topic可以分多个partition, consumer group 对partition进行consume操作。
  • 可扩展性:kafka集群支持热扩展
  • 持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失
  • 容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)
  • 高并发:支持数千个客户端同时读写

springboot整合

https://blog.csdn.net/qq_26869339/article/details/88324980

#kafka配置信息
kafka:
  producer:
    bootstrap-servers: 10.161.11.222:6667,10.161.11.223:6667,10.161.11.224:6667
    batch-size: 16785                                   #一次最多发送数据量
    retries: 1                                          #发送失败后的重复发送次数
    buffer-memory: 33554432                             #32M批处理缓冲区
    linger: 1
  consumer:
    bootstrap-servers: 10.161.11.222:6667,10.161.11.223:6667,10.161.11.224:6667
    auto-offset-reset: earliest                         #最早未被消费的offset
    group-id: log-hs-grou20
    max-poll-records: 4639                              #批量消费一次最大拉取的数据量
    enable-auto-commit: false                           #是否开启自动提交
    auto-commit-interval: 1000                          #自动提交的间隔时间
    session-timeout: 6000                               #连接超时时间
    max-poll-interval: 2000                             #手动提交设置与poll的心跳数
  listener:
    batch-listener: true                                #是否开启批量消费,true表示批量消费
    concurrency: 3                                      #设置消费的线程数
    poll-timeout: 1500                                  #自动提交设置,如果消息队列中没有消息,等待timeout毫秒后,调用poll()方法。如果队列中有消息,立即消费消息,每次消费的消息的多少可以通过max.poll.records配置。
    topics: hs-test,hs-test1