kafka optimize

主要优化原理和思路

  1. 磁盘优化

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    kafka是一个高吞吐量分布式消息系统,并且提供了持久化.
    其高性能的有两个重要特点:
    * 利用了磁盘连续读写性能远远高于随机读写的特点
    * 并发,将一个topic拆分多个partition
    要充分发挥kafka的性能,就需要满足这两个条件

    kafka读写的单位是partition.因此,将一个topic拆分为多个partition可以提高吞吐量.但是,这里有个前提,就是不同partition需要位于不同的磁盘(可以在同一个机器).如果多个partition位于同一个磁盘,那么意味着有多个进程同时对一个磁盘的多个文件进行读写,使得操作系统会对磁盘读写进行频繁调度,也就是破坏了磁盘读写的连续性.

    具体配置上,是将不同磁盘的多个目录配置到broker的log.dirs,例如:
    log.dirs=/disk1/kafka-logs,/disk2/kafka-logs,/disk3/kafka-logs

    kafka会在新建partition的时候,将新partition分布在partition最少的目录上,因此,一般不能将同一个磁盘的多个目录设置到log.dirs.
    同一个ConsumerGroup内的Consumer和Partition在同一时间内必须保证是一对一的消费关系.任意Partition在某一个时刻只能被一个Consumer Group内的一个Consumer消费(反过来一个Consumer则可以同时消费多个Partition)
  2. JVM参数配置

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    推荐使用最新的G1来代替CMS作为垃圾回收器,推荐JDK最低版本为1.7u51

    -Xms30g -Xmx30g -XX:PermSize=48m -XX:MaxPermSize=48m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35

    G1相比较于CMS的优势:
    * G1是一种适用于服务器端的垃圾回收器,很好的平衡了吞吐量和响应能力
    * 对于内存的划分方法不同,Eden,Survivor,Old区域不再固定,使用内存会更高效.G1通过对内存进行Region的划分,有效避免了内存碎片问题
    * G1可以指定GC时可用于暂停线程的时间(不保证严格遵守)而CMS并不提供可控选项
    * CMS只有在FullGC之后会重新合并压缩内存,而G1把回收和合并集合在一起
    * CMS只能使用在Old区,在清理Young时一般是配合使用ParNew,而G1可以统一两类分区的回收算法

    G1的适用场景:
    * JVM占用内存较大(At least 4G)
    * 应用本身频繁申请,释放内存,进而产生大量内存碎片时。
    * 对于GC时间较为敏感的应用
  3. Broker参数配置

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    //网络和io操作线程配置优化
    # broker处理消息的最大线程数
    num.network.threads=xxx

    # broker处理磁盘IO的线程数
    num.io.threads=xxx

    说明:
    num.network.threads用于接收并处理网络请求的线程数,默认为3.其内部实现是采用Selector模型.启动一个线程作为Acceptor来负责建立连接,再配合启动num.network.threads个线程来轮流负责从Sockets里读取请求,一般无需改动,除非上下游并发请求量过大.一般num.network.threads主要处理网络io,读写缓冲区数据,基本没有io等待,配置线程数量为cpu核数加1.

    num.io.threads主要进行磁盘io操作,高峰期可能有些io等待,因此配置需要大些.配置线程数量为cpu核数2倍,最大不超过3倍.
    1
    2
    3
    4
    5
    //log数据文件刷盘策略
    为了大幅度提高producer写入吞吐量,需要定期批量写文件:
    # 每当producer写入10000条消息时,刷数据到磁盘 log.flush.interval.messages=10000
    # 每间隔1秒钟时间,刷数据到磁盘
    log.flush.interval.ms=1000
    1
    2
    3
    4
    5
    6
    7
    8
    //日志保留策略配置
    当kafka server被写入海量消息后,会生成很多数据文件,且占用大量磁盘空间,如果不及时清理,可能磁盘空间不够用,kafka默认是保留7天.
    建议配置:
    # 保留三天,也可以更短
    log.retention.hours=72

    # 段文件配置1GB,有利于快速回收磁盘空间,重启kafka加载也会加快(如果文件过小,则文件数量比较多,kafka启动时是单线程扫描目录(log.dir)下所有数据文件)
    log.segment.bytes=1073741824
    1
    2
    3
    4
    5
    6
    7
    Kafka官方并不建议通过Broker端的log.flush.interval.messages和log.flush.interval.ms来强制写盘,认为数据的可靠性应该通过Replica来保证,而强制Flush数据到磁盘会对整体性能产生影响.

    可以通过调整/proc/sys/vm/dirty_background_ratio和/proc/sys/vm/dirty_ratio来调优性能:
    * 脏页率超过第一个指标会启动pdflush开始Flush Dirty PageCache
    * 脏页率超过第二个指标会阻塞所有的写操作来进行Flush
    * 根据不同的业务需求可以适当的降低dirty_background_ratio和提高dirty_ratio
    * 如果topic的数据量较小可以考虑减少log.flush.interval.ms和log.flush.interval.messages来强制刷写数据,减少可能由于缓存数据未写盘带来的不一致
  4. 配置jmx

    1
    2
    3
    [penn@root kafka_2.10-0.8.1]$ vim bin/kafka-run-class.sh
    #最前面添加一行
    JMX_PORT=8060
  5. Replica相关配置

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    replica.lag.time.max.ms:10000
    replica.lag.max.messages:4000

    num.replica.fetchers:1
    #在Replica上会启动若干Fetch线程把对应的数据同步到本地,而num.replica.fetchers这个参数是用来控制Fetch线程的数量.
    #每个Partition启动的多个Fetcher,通过共享offset既保证了同一时间内Consumer和Partition之间的一对一关系,又允许我们通过增多Fetch线程来提高效率

    default.replication.factor:1
    #这个参数指新创建一个topic时,默认的Replica数量
    #Replica过少会影响数据的可用性,太多则会白白浪费存储资源,一般建议在2~3为宜
  6. purgatory

    1
    2
    fetch.purgatory.purge.interval.requests:1000
    producer.purgatory.purge.interval.requests:1000

http://blog.csdn.net/vegetable_bird_001/article/details/51858915