kafka add topic partitions

kafka 修改topic的partition数量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//修改前检查
penn@ubuntu:~$ /mnt/app/kafka.1/bin/kafka-topics.sh --zookeeper 10.0.2.15:2181/kafka --describe --topic test-2
Topic:test-2 PartitionCount:2 ReplicationFactor:2 Configs:
Topic: test-2 Partition: 0 Leader: 2 Replicas: 2,1 Isr: 2,1
Topic: test-2 Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2

//增加topic的partition
penn@ubuntu:~$ /mnt/app/kafka.1/bin/kafka-topics.sh --zookeeper 10.0.2.15:2181/kafka --alter --topic test-2 --partitions 3
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!


//修改后检查
penn@ubuntu:~$ /mnt/app/kafka.1/bin/kafka-topics.sh --zookeeper 10.0.2.15:2181/kafka --describe --topic test-2
Topic:test-2 PartitionCount:3 ReplicationFactor:2 Configs:
Topic: test-2 Partition: 0 Leader: 2 Replicas: 2,1 Isr: 2,1
Topic: test-2 Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: test-2 Partition: 2 Leader: 4 Replicas: 4,2 Isr: 4,2

kafka use

kafka 常用命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//查看topic
bin/kafka-topics.sh --zookeeper localhost:2181 --list

//创建3个副本的topic
bin/kafka-topics.sh --zookeeper localhost:2181 --create --replication-factor 3 --partitions 1 --topic my-replicated-topic

//查看topic状态
bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic my-replicated-topic

//删除topic
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic my-replicated-topic

//发送数据
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic

//消费数据
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-replicated-topic --from-beginning
1
2
3
4
5
6
7
//查看当前消费者是否已经读到最新的数据
[wisdom@10 ~]$ /mnt/app/kafka/bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group Booking --zookeeper 10.0.3.65:2181 --topic Booking
[2017-03-17 14:35:48,094] WARN WARNING: ConsumerOffsetChecker is deprecated and will be dropped in releases following 0.9.0. Use ConsumerGroupCommand instead. (kafka.tools.ConsumerOffsetChecker$)
Group Topic Pid Offset logSize Lag Owner
Booking Booking 0 101 166 65 Booking_470_636253289830976972-0
Booking Booking 1 97 167 70
Booking_470_636253289830976972-0

kafka info

kafka 关键词

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Kafka部分名词解释如下:
1.Broker
消息中间件处理结点,一个Kafka节点就是一个broker,多个broker可以组成一个Kafka集群
2.Topic
一类消息,例如page view日志,click日志等都可以以topic的形式存在,Kafka集群能够同时负责多个topic的分发
3.Partition
topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列
4.Segment
partition物理上由多个segment组成
5.offset
每个partition都由一系列有序的,不可变的消息组成,这些消息被连续的追加到partition中.partition中的每个消息都有一个连续的序列号叫做offset,用于partition唯一标识一条消息.

分析过程分为以下4个步骤:
1.topic中partition存储分布
2.partiton中文件存储方式
3.partiton中segment文件存储结构
4.在partition中如何通过offset查找message

kafka副本分配算法

1
2
3
4
副本分配算法如下:(mod表示余数,例如: 3 mod 2 = 1)
1.将所有N Broker和待分配的i个Partition排序.
2.将第i个Partition分配到第(i mod n)个Broker上.
3.将第i个Partition的第j个副本分配到第((i + j) mod n)个Broker上.

kafka存储过程

1
2
3
4
5
6
7
8
Kafka运行时很少有大量读磁盘的操作,主要是定期批量写磁盘操作,因此操作磁盘很高效.这跟Kafka文件存储中读写message的设计是息息相关的.Kafka中读写message有如下特点:
写message:
1.消息从java堆转入page cache(即物理内存)
2.由异步线程刷盘,消息从page cache刷入磁盘

读message:
1.消息直接从page cache转入socket发送出去
2.当从page cache没有找到相应数据时,此时会产生磁盘IO,从磁盘Load消息到page cache,然后直接从socket发出去

kafka Partition Recovery机制

1
2
3
4
5
6
每个Partition会在磁盘记录一个RecoveryPoint, 记录已经flush到磁盘的最大offset。当broker fail 重启时,会进行loadLogs。 首先会读取该Partition的RecoveryPoint,找到包含RecoveryPoint的segment及以后的segment, 这些segment就是可能没有 完全flush到磁盘segments。然后调用segment的recover,重新读取各个segment的msg,并重建索引

优点
1.以segment为单位管理Partition数据,方便数据生命周期的管理,删除过期数据简单
2.在程序崩溃重启时,加快recovery速度,只需恢复未完全flush到磁盘的segment
3.通过index中offset与物理偏移映射,用二分查找能快速定位msg,并且通过分多个Segment,每个index文件很小,查找速度更快。

kafka Partition Replica同步机制

1
2
3
4
5
6
1.Partition的多个replica中一个为Leader,其余为follower
2.Producer只与Leader交互,把数据写入到Leader中
3.Followers从Leader中拉取数据进行数据同步
4.Consumer只从Leader拉取数据

ISR:所有不落后的replica集合,不落后有两层含义:距离上次FetchRequest的时间不大于某一个值或落后的消息数不大于某一个值, Leader失败后会从ISR中选取一个Follower做Leader

数据可靠性保证

1
2
3
4
5
6
7
8
当Producer向Leader发送数据时,可以通过acks参数设置数据可靠性的级别
1.0: 不论写入是否成功,server不需要给Producer发送Response,如果发生异常,server会终止连接,触发Producer更新meta数据;
2.1: Leader写入成功后即发送Response,此种情况如果Leader fail,会丢失数据
3.-1: 等待所有ISR接收到消息后再给Producer发送Response,这是最强保证
仅设置acks=-1也不能保证数据不丢失,当Isr列表中只有Leader时,同样有可能造成数据丢失.要保证数据不丢除了设置acks=-1, 还要保证ISR的大小大于等于2,具体参数设置:
1.request.required.acks:设置为-1等待所有ISR列表中的Replica接收到消息后采算写成功;
2.min.insync.replicas: 设置为大于等于2,保证ISR中至少有两个Replica
Producer要在吞吐率和数据可靠性之间做一个权衡

数据一致性保证

1
2
3
4
5
6
一致性定义:若某条消息对Consumer可见,那么即使Leader宕机了,在新Leader上数据依然可以被读到
1.HighWaterMark简称HW:
Partition的高水位,取一个partition对应的ISR中最小的LEO作为HW,消费者最多只能消费到HW所在的位置,另外每个replica都有highWatermark,leader和follower各自负责更新自己的highWatermark状态,highWatermark <= leader.LogEndOffset
2.对于Leader新写入的msg,Consumer不能立刻消费,Leader会等待该消息被所有ISR中的replica同步后,更新HW,此时该消息才能被Consumer消费,即Consumer最多只能消费到HW位置

这样就保证了如果Leader Broker失效,该消息仍然可以从新选举的Leader中获取.对于来自内部Broker的读取请求,没有HW的限制.同时,Follower也会维护一份自己的HW,Folloer.HW = min(Leader.HW,Follower.offset)

kafka optimize

主要优化原理和思路

  1. 磁盘优化

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    kafka是一个高吞吐量分布式消息系统,并且提供了持久化.
    其高性能的有两个重要特点:
    * 利用了磁盘连续读写性能远远高于随机读写的特点
    * 并发,将一个topic拆分多个partition
    要充分发挥kafka的性能,就需要满足这两个条件

    kafka读写的单位是partition.因此,将一个topic拆分为多个partition可以提高吞吐量.但是,这里有个前提,就是不同partition需要位于不同的磁盘(可以在同一个机器).如果多个partition位于同一个磁盘,那么意味着有多个进程同时对一个磁盘的多个文件进行读写,使得操作系统会对磁盘读写进行频繁调度,也就是破坏了磁盘读写的连续性.

    具体配置上,是将不同磁盘的多个目录配置到broker的log.dirs,例如:
    log.dirs=/disk1/kafka-logs,/disk2/kafka-logs,/disk3/kafka-logs

    kafka会在新建partition的时候,将新partition分布在partition最少的目录上,因此,一般不能将同一个磁盘的多个目录设置到log.dirs.
    同一个ConsumerGroup内的Consumer和Partition在同一时间内必须保证是一对一的消费关系.任意Partition在某一个时刻只能被一个Consumer Group内的一个Consumer消费(反过来一个Consumer则可以同时消费多个Partition)
  2. JVM参数配置

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    推荐使用最新的G1来代替CMS作为垃圾回收器,推荐JDK最低版本为1.7u51

    -Xms30g -Xmx30g -XX:PermSize=48m -XX:MaxPermSize=48m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35

    G1相比较于CMS的优势:
    * G1是一种适用于服务器端的垃圾回收器,很好的平衡了吞吐量和响应能力
    * 对于内存的划分方法不同,Eden,Survivor,Old区域不再固定,使用内存会更高效.G1通过对内存进行Region的划分,有效避免了内存碎片问题
    * G1可以指定GC时可用于暂停线程的时间(不保证严格遵守)而CMS并不提供可控选项
    * CMS只有在FullGC之后会重新合并压缩内存,而G1把回收和合并集合在一起
    * CMS只能使用在Old区,在清理Young时一般是配合使用ParNew,而G1可以统一两类分区的回收算法

    G1的适用场景:
    * JVM占用内存较大(At least 4G)
    * 应用本身频繁申请,释放内存,进而产生大量内存碎片时。
    * 对于GC时间较为敏感的应用
  3. Broker参数配置

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    //网络和io操作线程配置优化
    # broker处理消息的最大线程数
    num.network.threads=xxx

    # broker处理磁盘IO的线程数
    num.io.threads=xxx

    说明:
    num.network.threads用于接收并处理网络请求的线程数,默认为3.其内部实现是采用Selector模型.启动一个线程作为Acceptor来负责建立连接,再配合启动num.network.threads个线程来轮流负责从Sockets里读取请求,一般无需改动,除非上下游并发请求量过大.一般num.network.threads主要处理网络io,读写缓冲区数据,基本没有io等待,配置线程数量为cpu核数加1.

    num.io.threads主要进行磁盘io操作,高峰期可能有些io等待,因此配置需要大些.配置线程数量为cpu核数2倍,最大不超过3倍.
    1
    2
    3
    4
    5
    //log数据文件刷盘策略
    为了大幅度提高producer写入吞吐量,需要定期批量写文件:
    # 每当producer写入10000条消息时,刷数据到磁盘 log.flush.interval.messages=10000
    # 每间隔1秒钟时间,刷数据到磁盘
    log.flush.interval.ms=1000
    1
    2
    3
    4
    5
    6
    7
    8
    //日志保留策略配置
    当kafka server被写入海量消息后,会生成很多数据文件,且占用大量磁盘空间,如果不及时清理,可能磁盘空间不够用,kafka默认是保留7天.
    建议配置:
    # 保留三天,也可以更短
    log.retention.hours=72

    # 段文件配置1GB,有利于快速回收磁盘空间,重启kafka加载也会加快(如果文件过小,则文件数量比较多,kafka启动时是单线程扫描目录(log.dir)下所有数据文件)
    log.segment.bytes=1073741824
    1
    2
    3
    4
    5
    6
    7
    Kafka官方并不建议通过Broker端的log.flush.interval.messages和log.flush.interval.ms来强制写盘,认为数据的可靠性应该通过Replica来保证,而强制Flush数据到磁盘会对整体性能产生影响.

    可以通过调整/proc/sys/vm/dirty_background_ratio和/proc/sys/vm/dirty_ratio来调优性能:
    * 脏页率超过第一个指标会启动pdflush开始Flush Dirty PageCache
    * 脏页率超过第二个指标会阻塞所有的写操作来进行Flush
    * 根据不同的业务需求可以适当的降低dirty_background_ratio和提高dirty_ratio
    * 如果topic的数据量较小可以考虑减少log.flush.interval.ms和log.flush.interval.messages来强制刷写数据,减少可能由于缓存数据未写盘带来的不一致
  4. 配置jmx

    1
    2
    3
    [penn@root kafka_2.10-0.8.1]$ vim bin/kafka-run-class.sh
    #最前面添加一行
    JMX_PORT=8060
  5. Replica相关配置

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    replica.lag.time.max.ms:10000
    replica.lag.max.messages:4000

    num.replica.fetchers:1
    #在Replica上会启动若干Fetch线程把对应的数据同步到本地,而num.replica.fetchers这个参数是用来控制Fetch线程的数量.
    #每个Partition启动的多个Fetcher,通过共享offset既保证了同一时间内Consumer和Partition之间的一对一关系,又允许我们通过增多Fetch线程来提高效率

    default.replication.factor:1
    #这个参数指新创建一个topic时,默认的Replica数量
    #Replica过少会影响数据的可用性,太多则会白白浪费存储资源,一般建议在2~3为宜
  6. purgatory

    1
    2
    fetch.purgatory.purge.interval.requests:1000
    producer.purgatory.purge.interval.requests:1000

http://blog.csdn.net/vegetable_bird_001/article/details/51858915

kafka install

java install

参考java 环境安装

zookeeper install

参考zookeeper install

kafka standard install

  1. kafka安装

    1
    2
    3
    4
    5
    6
    7
    [root@10 ~]# cd /mnt/ops/app/
    [root@10 app]# tar xzf kafka_2.11-0.9.0.0.tgz
    [root@10 app]# mv kafka_2.11-0.9.0.0 /mnt/app/kafka
    [root@10 app]# chown -R wisdom.wisdom /mnt/app/kafka

    [root@10 app]# mkdir -p /mnt/{data,log}/kafka
    [root@10 app]# chown -R wisdom.wisdom /mnt/{data,log}/kafka
  2. kafka配置文件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    [root@10 app]# cp /mnt/app/kafka/config/{server.properties,server.properties.bak}
    [root@10 app]# cat > /mnt/app/kafka/config/server.properties <<EOF
    > broker.id=113
    > advertised.host.name=10.0.2.113
    > advertised.port=9092
    > delete.topic.enable=true
    > listeners=PLAINTEXT://:9092
    > num.network.threads=9
    > num.io.threads=16
    > socket.send.buffer.bytes=102400
    > socket.receive.buffer.bytes=102400
    > socket.request.max.bytes=104857600
    > log.dirs=/mnt/data/kafka
    > num.partitions=3
    > num.recovery.threads.per.data.dir=2
    > default.replication.factor = 1
    > replica.fetch.max.bytes=20000000
    > num.replica.fetchers=2
    > message.max.bytes=10000000
    > log.flush.interval.messages=10000
    > log.flush.interval.ms=1000
    > log.retention.hours=48
    > log.segment.bytes=1073741824
    > log.retention.check.interval.ms=300000
    > zookeeper.connect=10.0.2.113:2181
    > zookeeper.connection.timeout.ms=6000
    > EOF
  3. kafka log存放位置

    1
    2
    3
    4
    5
    6
    7
    8
    9
    [root@10 app]# vim /mnt/app/kafka/bin/kafka-run-class.sh
    # Log directory to use
    if [ "x$LOG_DIR" = "x" ]; then
    LOG_DIR="/mnt/log/kafka"
    JMX_PORT=8092
    else
    LOG_DIR="/mnt/log/kafka"
    JMX_PORT=8092
    fi
  4. kafka jvm修改

    1
    2
    3
    4
    [root@10 app]# vim /mnt/app/kafka/bin/kafka-server-start.sh
    if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx4G -Xms4G -Xmn2G -XX:PermSize=64m -XX:MaxPermSize=128m -XX:SurvivorRatio=6 -XX:CMSInitiatingOccupancyFraction=70 -XX:+UseCMSInitiatingOccupancyOnly"
    fi
  5. kafka 启动

    1
    2
    [root@10 app]# su - wisdom
    [wisdom@10 ~]$ /mnt/app/kafka/bin/kafka-server-start.sh -daemon /mnt/app/kafka/config/server.properties

kafka cluster install

  1. kafka安装

    1
    2
    3
    4
    5
    6
    7
    [root@10 ~]# cd /mnt/ops/app/
    [root@10 app]# tar xzf kafka_2.11-0.9.0.0.tgz
    [root@10 app]# mv kafka_2.11-0.9.0.0 /mnt/app/kafka
    [root@10 app]# chown -R wisdom.wisdom /mnt/app/kafka

    [root@10 app]# mkdir -p /mnt/{data,log}/kafka
    [root@10 app]# chown -R wisdom.wisdom /mnt/{data,log}/kafka
  2. kafka配置文件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    [root@10 app]# cp /mnt/app/kafka/config/{server.properties,server.properties.bak}

    kafka cluster-1:
    [root@10 app]# cat > /mnt/app/kafka/config/server.properties <<EOF
    > broker.id=113
    > advertised.host.name=10.0.2.113
    > advertised.port=9092
    > delete.topic.enable=true
    > listeners=PLAINTEXT://:9092
    > num.network.threads=4
    > num.io.threads=8
    > socket.send.buffer.bytes=102400
    > socket.receive.buffer.bytes=102400
    > socket.request.max.bytes=104857600
    > log.dirs=/mnt/data/kafka
    > num.partitions=3
    > num.recovery.threads.per.data.dir=2
    > default.replication.factor = 1
    > replica.fetch.max.bytes=20000000
    > num.replica.fetchers=2
    > message.max.bytes=10000000
    > log.flush.interval.messages=10000
    > log.flush.interval.ms=1000
    > log.retention.hours=48
    > log.segment.bytes=1073741824
    > log.retention.check.interval.ms=300000
    > zookeeper.connect=10.0.2.113:2181,10.0.2.114:2181,10.0.2.115:2181
    > zookeeper.connection.timeout.ms=6000
    > EOF

    kafka cluster-2:
    [root@10 app]# cat > /mnt/app/kafka/config/server.properties <<EOF
    > broker.id=114
    > advertised.host.name=10.0.2.114
    > advertised.port=9092
    > delete.topic.enable=true
    > listeners=PLAINTEXT://:9092
    > num.network.threads=4
    > num.io.threads=8
    > socket.send.buffer.bytes=102400
    > socket.receive.buffer.bytes=102400
    > socket.request.max.bytes=104857600
    > log.dirs=/mnt/data/kafka
    > num.partitions=5
    > num.recovery.threads.per.data.dir=2
    > default.replication.factor = 1
    > replica.fetch.max.bytes=20000000
    > num.replica.fetchers=2
    > message.max.bytes=10000000
    > log.flush.interval.messages=10000
    > log.flush.interval.ms=1000
    > log.retention.hours=48
    > log.segment.bytes=1073741824
    > log.retention.check.interval.ms=300000
    > zookeeper.connect=10.0.2.113:2181,10.0.2.114:2181,10.0.2.115:2181
    > zookeeper.connection.timeout.ms=6000
    > EOF

    kafka cluster-3:
    [root@10 app]# cat > /mnt/app/kafka/config/server.properties <<EOF
    > broker.id=115
    > advertised.host.name=10.0.2.115
    > advertised.port=9092
    > delete.topic.enable=true
    > listeners=PLAINTEXT://:9092
    > num.network.threads=4
    > num.io.threads=8
    > socket.send.buffer.bytes=102400
    > socket.receive.buffer.bytes=102400
    > socket.request.max.bytes=104857600
    > log.dirs=/mnt/data/kafka
    > num.partitions=5
    > num.recovery.threads.per.data.dir=2
    > default.replication.factor = 1
    > replica.fetch.max.bytes=20000000
    > num.replica.fetchers=2
    > message.max.bytes=10000000
    > log.flush.interval.messages=10000
    > log.flush.interval.ms=1000
    > log.retention.hours=48
    > log.segment.bytes=1073741824
    > log.retention.check.interval.ms=300000
    > zookeeper.connect=10.0.2.113:2181,10.0.2.114:2181,10.0.2.115:2181
    > zookeeper.connection.timeout.ms=6000
    > EOF
  3. kafka log存放位置

    1
    2
    3
    4
    5
    6
    7
    8
    9
    [root@10 app]# vim /mnt/app/kafka/bin/kafka-run-class.sh
    # Log directory to use
    if [ "x$LOG_DIR" = "x" ]; then
    LOG_DIR="/mnt/log/kafka"
    JMX_PORT=8092
    else
    LOG_DIR="/mnt/log/kafka"
    JMX_PORT=8092
    fi
  4. kafka jvm修改

    1
    2
    3
    4
    [root@10 app]# vim /mnt/app/kafka/bin/kafka-server-start.sh
    if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx4G -Xms4G -Xmn2G -XX:PermSize=64m -XX:MaxPermSize=128m -XX:SurvivorRatio=6 -XX:CMSInitiatingOccupancyFraction=70 -XX:+UseCMSInitiatingOccupancyOnly"
    fi
  5. kafka 启动

    1
    2
    [root@10 app]# su - wisdom
    [wisdom@10 ~]$ /mnt/app/kafka/bin/kafka-server-start.sh -daemon /mnt/app/kafka/config/server.properties

java performance analysis

jps 分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
* jps位于jdk的bin目录下,其作用是显示当前系统的java进程情况,及其id号
* jps并不使用应用程序名来查找JVM实例,因此,它查找所有的Java应用程序(包括即使没有使用java执行体的那种(例如:定制的启动器))
* jps仅查找当前用户的Java进程,而不是当前系统中的所有进程


JDK中的jps命令可以显示当前运行的java进程以及相关参数,它的实现机制如下:
java程序在启动以后,会在"java.io.tmpdir"指定的目录下,就是临时文件夹里,生成一个类似于"hsperfdata_User"的文件夹,在这个文件夹里(Linux中为/tmp/hsperfdata_{userName}/),有几个文件,名字就是java进程的pid,因此列出当前运行的java进程,只是把这个目录里的文件名列一下而已.
例如:
[wisdom@10 ~]$ ls -ld /tmp/hsperfdata_*
drwxr-xr-x 2 root root 4096 Mar 7 14:47 /tmp/hsperfdata_root
drwxr-xr-x 2 wisdom wisdom 4096 Mar 7 14:49 /tmp/hsperfdata_wisdom


例如:
[wisdom@10 ~]$ jps
1347 Elasticsearch
12662 Jps
16731 QuorumPeerMain
17084 Kafka
21790 Main

# -q 只显示pid,不显示class名称,jar文件名和传递给main 方法的参数
[wisdom@10 ~]$ jps -q
1347
16731
17084
14093
21790

# -m 输出传递给main方法的参数,在嵌入式jvm上可能是null,在这里,在启动main方法的时候,我给String[] args传递两个参数
[wisdom@10 ~]$ jps -m
1347 Elasticsearch start -d -p /mnt/app/elasticsearch/elasticsearch.pid
16731 QuorumPeerMain /mnt/app/zookeeper/bin/../conf/zoo.cfg
17084 Kafka /mnt/app/kafka/config/server.properties
21790 Main --1.9 /mnt/app/logstash/lib/bootstrap/environment.rb logstash/runner.rb agent -f /mnt/app/logstash/config/logstash-ssp-4502.conf -l /mnt/log/logstash/logstash-4502.log
14287 Jps -m

# -l 输出应用程序main class的完整package名 或者 应用程序的jar文件完整路径名
[wisdom@10 ~]$ jps -l
1347 org.elasticsearch.bootstrap.Elasticsearch
16731 org.apache.zookeeper.server.quorum.QuorumPeerMain
17084 kafka.Kafka
16094 sun.tools.jps.Jps
21790 org.jruby.Main

# -v 输出传递给JVM的参数
[wisdom@10 ~]$ jps -v
1347 Elasticsearch -Xms5g -Xmx5g -Djava.awt.headless=true -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly -XX:+HeapDumpOnOutOfMemoryError -XX:+DisableExplicitGC -Dfile.encoding=UTF-8 -Djna.nosys=true -Des.path.home=/mnt/app/elasticsearch
16731 QuorumPeerMain -Dzookeeper.log.dir=/mnt/log/zookeeper -Dzookeeper.root.logger=INFO,CONSOLE -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.local.only=false
17084 Kafka -Xmx1G -Xms1G -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC -Djava.awt.headless=true -Xloggc:/mnt/log/kafka/kafkaServer-gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/mnt/log/kafka -Dlog4j.configuration=file:/mnt/app/kafka/bin/../config/log4j.properties
16637 Jps -Denv.class.path=.:/mnt/app/java/lib:/mnt/app/java/jre/lib -Dapplication.home=/mnt/app/java -Xms8m
21790 Main -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -Djava.awt.headless=true -XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly -XX:+HeapDumpOnOutOfMemoryError -Xmx1g -Xss2048k -Djffi.boot.library.path=/mnt/app/logstash/vendor/jruby/lib/jni -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -Djava.awt.headless=true -XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/mnt/app/logstash/heapdump.hprof -Xbootclasspath/a:/mnt/app/logstash/vendor/jruby/lib/jruby.jar -Djruby.home=/mnt/app/logstash/vendor/jruby -Djruby.lib=/mnt/app/logstash/vendor/jruby/lib -Djruby.script=jruby -Djruby.shell=/bin/sh


拓展:JPS失效处理
现象: 用"ps -ef|grep java"能看到启动的java进程,但是用jps查看却不存在该进程的id.jconsole,jvisualvm可能无法监控该进程,其他java自带工具也可能无法使用
分析: jps,jconsole,jvisualvm等工具的数据来源就是这个文件(/tmp/hsperfdata_{username}/pid).所以当该文件不存在或是无法读取时就会出现jps无法查看该进程号,jconsole无法监控等问题
原因:
1.磁盘读写,目录权限问题
若该用户没有权限写/tmp目录或是磁盘已满,则无法创建/tmp/hsperfdata_{username}/pid文件.或该文件已经生成,但用户没有读权限
2.临时文件丢失,被删除或是定期清理
对于linux机器,一般都会存在定时任务对临时文件夹进行清理,导致/tmp目录被清空.常用的可能定时删除临时目录的工具为crontab,redhat的tmpwatch,ubuntu的tmpreaper等.这个导致的现象可能会是这样,用jconsole监控进程,发现在某一时段后进程仍然存在,但是却没有监控信息了
3.java进程信息文件存储地址被设置不在/tmp目录下
java默认会在/tmp/hsperfdata_{username}目录保存进程信息,但由于以上1,2所述原因,可能导致该文件无法生成或是丢失,所以java启动时提供了参数(-Djava.io.tmpdir),可以对这个文件的位置进行设置,而jps,jconsole都只会从"/tmp"目录读取,而无法从设置后的目录读物信息

zookeeper optimize

zookeeper参数说明

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
1.tickTime:Client-Server通信心跳时间
Zookeeper服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个tickTime时间就会发送一个心跳.tickTime以毫秒为单位
tickTime=2000

2.initLimit:Leader-Follower初始通信时限
集群中的follower服务器(F)与leader服务器(L)之间初始连接时能容忍的最多心跳数(tickTime的数量)
initLimit=5

3.syncLimit:Leader-Follower同步通信时限
集群中的follower服务器与leader服务器之间请求和应答之间能容忍的最多心跳数(tickTime的数量)
syncLimit=2

4.dataDir:数据文件目录
Zookeeper保存数据的目录,默认情况下,Zookeeper将写数据的日志文件也保存在这个目录里
dataDir=/mnt/data/zookeeper

5.dataLogDir: 日志存放位置
Zookeeper保存日志存放位置,默认情况下,Zookeeper将写数据的日志文件放在dataDir目录里
dataLogDir=/mnt/log/zookeeper

6.clientPort:客户端连接端口
客户端连接Zookeeper服务器的端口,Zookeeper会监听这个端口,接受客户端的访问请求
clientPort=2181

7.globalOutstandingLimit: 最大请求堆积数.默认是1000.ZK运行的时候,尽管server已经没有空闲来处理更多的客户端请求了,但是还是允许客户端将请求提交到服务器上来,以提高吞吐性能.
globalOutstandingLimit=1000

8.preAllocSize:预先开辟磁盘空间,用于后续写入事务日志.默认是64M,每个事务日志大小就是64M.如果ZK的快照频率较大的话,建议适当减小这个参数

9.snapCount:每进行snapCount次事务日志输出后,触发一次快照(snapshot),此时,ZK会生成一个snapshot.*文件,同时创建一个新的事务日志文件log.*.默认是100000.

10.autopurge.snapRetainCount:指定了需要保留的文件数目
autopurge.snapRetainCount=3

7.服务器名称与地址:集群信息(服务器编号,服务器地址,LF通信端口,选举端口)
这个配置项的书写格式比较特殊,规则如下:
server.N=YYYY:A:B

N 表示服务器编号,需要在dataDir目录下创建myid文件,将N写入到myid文件内
YYYY 表示服务器地址
A 表示数据同步端口
B 表示选举端口

8.zookeeper集群为什么设置为奇数?
zookeeper有这样一个特性: 集群中只要有过半的机器是正常工作的,那么整个集群对外就是可用的.也就是说如果有2个zookeeper,那么只要有一个死了zookeeper就不能用了,因为一没有过半,所以2个zookeeper的死亡容忍度为0;同理,要是有3个zookeeper,一个死了,还剩下两个正常的,过半了,所以三个zookeeper的容忍度为1;同理你多列举几个(2 -> 0; 3 -> 1; 4 - >1; 5 -> 2; 6 -> 2):你会发现一个规律,2n和2n-1的容忍度是一样的,都是n-1,所以为了更加高效,何必增加那一个不必要的zookeeper呢.

zookeeper 参数调优

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
1.zookeeper默认jvm没有配置Xmx,Xms等信息,可以在conf目录下创建java.env文件
[root@10 ~]# cat > /mnt/app/zookeeper/conf/java.env <<EOF
> export JAVA_HOME=/mnt/app/java
> export JVMFLAGS="-Xms1024m -Xmx1024m $JVMFLAGS"
> EOF


2.log4j配置,由于zookeeper是通过nohup启动的,会有一个zookeeper.out日志文件,该文件中记录的是输出到console的日志.log4j中只要配置输出到console即可,zookeeper.out会日积月累不断变大,要放在容量大的磁盘上.

3.zoo.cfg配置,dataDir是存放快照数据的,dataLogDir是存放写前日志的.这两个目录不要配置成一个路径,要配置到不同的磁盘上.如果磁盘是使用了raid,系统就一块磁盘,那配置到一块磁盘上也可以.写前日志的部分对写请求的性能影响很大,保证dataLogDir所在磁盘性能良好

4.zoo.cfg配置中skipACL=yes,忽略ACL验证,可以减少权限验证的相关操作,提升一点性能.

5.zoo.cfg配置中forceSync=no,这个对写请求的性能提升很有帮助,是指每次写请求的数据都要从pagecache中固化到磁盘上,才算是写成功返回.当写请求数量到达一定程度的时候,后续写请求会等待前面写请求的forceSync操作,造成一定延时.如果追求低延时的写请求,配置forceSync=no,数据写到pagecache后就返回.但是机器断电的时候,pagecache中的数据有可能丢失.

6.zookeeper的dataDir和dataLogDir路径下,如果没有配置zk自动清理,会不断的新增数据文件.可配置成zk系统自动清理数据文件,但是要求系统最高性能的话,建议人工手动清理文件:zkCleanup.sh -n 3

7.查看zk节点状态.重新启动zk节点前后,一定要查看状态

8.配置fsync.warningthresholdms=20,单位是毫秒,在forceSync=yes的时候,如果数据固化到磁盘的操作fsync超过20ms的时候,将会在zookeeper.out中输出一条warn日志.这个目前zk的3.4.5和3.5版本有bug,在zoo.cfg中配置不生效.我的做法是在conf/java.env中添加java系统属性:、
export JVMFLAGS="-Dfsync.warningthresholdms=20 $JVMFLAGS"

command split

  1. split 分割文件
    1
    2
    3
    4
    5
    6
    7
    8
    [root@localhost ~]# split -l 10000 ccc -d -a 2 num_

    说明:
    -l 表示按照多少行进行分割
    -d 表示分割后的文件名以数字结尾
    -a 2 表示结尾是2个字符,文件后缀
    -d -a 2 表示分割后的文件以2个数字为后缀
    num_ 表示分割后的文件名

postgresql plugin install

1. pg 安装本地插件库(all)

1
2
3
[root@localhost app]# tar xzf postgresql-9.6.1.tar.gz
[root@localhost app]# cd postgresql-9.6.1/contrib/
[root@localhost contrib]# make install

2. pg_pathman download(https://pgxn.org/dist/pg_pathman/)

  1. pg_pathman download

    1
    2
    3
    [root@localhost ~]# git clone https://github.com/postgrespro/pg_pathman /mnt/ops/app/postgresql-9.6.1/contrib/pg_pathman

    说明: 我将pg_pathman下载到postgresql源码的插件目录contrib目录下
  2. pg_pathman install

    1
    2
    3
    [root@localhost ~]# cd /mnt/ops/app/postgresql-9.6.1/contrib/pg_pathman
    [root@localhost pg_pathman]# make
    [root@localhost pg_pathman]# make install
  3. 在配置文件中添加pg_pathman参数

    1
    2
    3
    4
    [root@localhost pg_pathman]# su - wisdom
    [wisdom@localhost ~]$ vim /mnt/data/pgsql/postgresql.conf
    shared_preload_libraries = 'pg_pathman,pg_stat_statements'
    [wisdom@localhost ~]$ /mnt/app/pgsql/bin/pg_ctl -D /mnt/data/pgsql/ -l /mnt/log/pgsql/pgsql.log restart
  4. pg_pathman use

    1
    2
    3
    4
    [root@localhost pg_pathman]# su - wisdom
    [wisdom@localhost ~]$ /mnt/app/pgsql/bin/psql -d postgres
    postgres=# create extension pg_pathman;
    CREATE EXTENSION

command sed

  1. 删除行首空格

    1
    2
    3
    4
    5
    6
    7
    sed 's/^[ \t]*//g'

    说明:
    第一个/的左边是s表示替换,即将空格替换为空
    第一个/的右边是表示后面的以xx开头中括号表示“或”,空格或tab中的任意一种。
    这是正则表达式的规范。 中括号右边是*,表示一个或多个。
    第二个和第三个\中间没有东西,表示空 g表示替换原来buffer中的,sed在处理字符串的时候并不对源文件进行直接处理,先创建一个buffer,但是加g表示对原buffer进行替换 整体的意思是:用空字符去替换一个或多个用空格或tab开头的本体字符串
  2. 删除行末空格

    1
    2
    3
    4
    sed 's/[ \t]*$//g'

    和上面稍微有些不同是前面删除了^符,在后面加上了美元符,这表示以xx结尾的字符串为对象。
    但是要注意在KSH中,Tab并不是\t而是直接打入一个Tab就可以了。