kafka with zookeeper

kafka 与 zookeeper 的关系

1
2
3
4
5
6
7
8
9
10
11
12
13
kafka对消息保存时根据Topic进行归类,发送消息者成为Producer,消息接受者成为Consumer,此外kafka集群有多个kafka实例组成,每个实例(server)成为broker.无论是kafka集群,还是producer和consumer都依赖于zookeeper来保证系统可用性集群保存一些meta信息.

一个Kafka集群通常包括多个代理.为了均衡负载,将话题分成多个分区,每个代理存储一或多个分区.多个生产者和消费者能够同时生产和获取消息.

一个Topic可以认为是一类消息,每个topic将被分成多个partition(区),每个partition在存储层面是append log文件.任何发布到此partition的消息都会被直接追加到log文件的尾部,每条消息在文件中的位置称为offset(偏移量),offset为一个long型数字,它是唯一标记一条消息.kafka并没有提供其他额外的索引机制来存储offset,因为在kafka中几乎不允许对消息进行"随机读写"

kafka和JMS(Java Message Service)实现(activeMQ)不同的是: 即使消息被消费,消息仍然不会被立即删除.日志文件将会根据broker中的配置要求,保留一定的时间之后删除;比如log文件保留2天,那么两天后,文件会被清除,无论其中的消息是否被消费.kafka通过这种简单的手段,来释放磁盘空间,以及减少消息消费之后对文件内容改动的磁盘IO开支.

对于consumer而言,它需要保存消费消息的offset,对于offset的保存和使用,有consumer来控制;当consumer正常消费消息时,offset将会"线性"的向前驱动,即消息将依次顺序被消费.事实上consumer可以使用任意顺序消费消息,它只需要将offset重置为任意值.(offset将会保存在zookeeper中)

kafka集群几乎不需要维护任何consumer和producer状态信息,这些信息有zookeeper保存;因此producer和consumer的客户端实现非常轻量级,它们可以随意离开,而不会对集群造成额外的影响.

partitions的设计目的有多个.最根本原因是kafka基于文件存储.通过分区,可以将日志内容分散到多个server上,来避免文件尺寸达到单机磁盘的上限,每个partiton都会被当前server(kafka实例)保存;可以将一个topic切分多任意多个partitions,来消息保存/消费的效率.此外越多的partitions意味着可以容纳更多的consumer,有效提升并发消费的能力.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
一个Topic的多个partitions,被分布在kafka集群中的多个server上;每个server(kafka实例)负责partitions中消息的读写操作;此外kafka还可以配置partitions需要备份的个数(replicas),每个partition将会被备份到多台机器上,以提高可用性.

基于replicated方案,那么就意味着需要对多个备份进行调度;每个partition都有一个server为"leader";leader负责所有的读写操作,如果leader失效,那么将会有其他follower来接管(成为新的leader);follower只是单调的和leader跟进,同步消息即可..由此可见作为leader的server承载了全部的请求压力,因此从集群的整体考虑,有多少个partitions就意味着有多少个"leader",kafka会将"leader"均衡的分散在每个实例上,来确保整体的性能稳定.

Producer将消息发布到指定的Topic中,同时Producer也能决定将此消息归属于哪个partition;比如基于"round-robin"方式或者通过其他的一些算法等.本质上kafka只支持Topic.每个consumer属于一个consumer group;反过来说,每个group中可以有多个consumer.发送到Topic的消息,只会被订阅此Topic的每个group中的一个consumer消费.

如果所有的consumer都具有相同的group,这种情况和queue模式很像;消息将会在consumers之间负载均衡.
如果所有的consumer都具有不同的group,那这就是"发布-订阅";消息将会广播给所有的消费者.

在kafka中,一个partition中的消息只会被group中的一个consumer消费;每个group中consumer消息消费互相独立;我们可以认为一个group是一个"订阅"者,一个Topic中的每个partions,只会被一个"订阅者"中的一个consumer消费,不过一个consumer可以消费多个partitions中的消息.kafka只能保证一个partition中的消息被某个consumer消费时,消息是顺序的.事实上,从Topic角度来说,消息仍不是有序的.

kafka的设计原理决定,对于一个topic,同一个group中不能有多于partitions个数的consumer同时消费,否则将意味着某些consumer将无法得到消息.

Guarantees
1) 发送到partitions中的消息将会按照它接收的顺序追加到日志中
2) 对于消费者而言,它们消费消息的顺序和日志中消息顺序一致.
3) 如果Topic的"replicationfactor"为N,那么允许N-1个kafka实例失效.

流量带宽

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
1Mbps=1024Kbps=1024/8KBps=128KB/s
我们拉宽带1M,下载速度才100多KBps(KB/s)等宽带速率大小字样

兆比特(Mbps)
时间内传输数据的平均比特数,其单位是比特每秒(bps),或千比特每秒(Kbps),或兆比特每秒(Mbps),其换算关系为:1Kbps=1024bps,1Mbps=1024*1024bps(有时为了计算简单记作1Kbps=1000bps,1Mbps=1000000bps)

数据传输 速率反映了终端设备之间的信息处理能力,它是一段时间的平均值
所以1Mbps(1兆比特每秒)=0.125M Byte/s(0.125兆字节每秒)=0.125M Byte/s*1024=128KB/s(128千字节每秒)即1Mbps(兆位每秒)=1/8 MB/s(八分之一兆字节每秒)
说明:
1B = 8b
两边同时扩大1024*1024倍,等式恒成立,即:
1 MB = 8 Mb
继而得出上面的公式,每秒传送1Mb,即:每秒传送了1/8MB = 0.125MB,即:
1Mbps = 0.125 M Byte/s = 1/8M Byte/s * 1024 = 128 KB/s

总结:
在计算机网络或者是网络运营商中,一般宽带速率的单位用bps(或b/s)表示;bps表示比特每秒即表示每秒钟传输多少位信息,是bit per second的缩写.在实际所说的1M带宽的意思是1Mbps(是兆比特每秒(Mbps),不是兆字节每秒(MBps))

20Mbps = 20 * 1024 = 20480 Kbps
1Mbps = 1024Kbps

RabbitMQ cluster

RabbitMQ 集群

  1. RabbitMQ始终会记录以下四种内部元数据

    1
    2
    3
    4
    5
    6
    7
    * 队列元数据(队列名称和它们的属性)
    * 交换器元数据(交换器名称,类型和属性)
    * 绑定元数据(一张简单的表格展示了如何将消息路由到队列)
    * vhost元数据(为vhost内部队列,交换器和绑定提供命名空间和安全属性)
    *注意:
    单一节点,RabbitMQ会将所有信息存储在内存,同时将那些标记为可持久化的队列和交换器存储到硬盘上
    引入集群,RabbitMQ需要追踪新的元数据类型:集群节点位置,以及节点与已记录的其他类型元数据的关系;集群也提供了选择,将元数据存储到磁盘或者内存中*
  2. 集群中的队列

    1
    2
    3
    4
    5
    6
    7
    8
    9
    将两个节点组成集群,如果创建队列,集群只会在单个节点而不是所有节点上创建完整的队列信息(元数据,状态,内容),只有队列的所有者知道有关队列的所有信息
    所有其他非所有者节点只知道队列的元数据和指向该队列存在的那个节点的指针.因此,当集群节点崩溃时,该节点的队列和关联绑定都消失了.
    *注意: 只有队列在最开始没有被设置可持久化时,集群节点崩溃才可以让消费者重连并重新创建队列;如果队列开始已经设置为可持久化,客户端重新声明会得到404错误,这样确保了当失败节点恢复后加入集群,该节点上的队列不会丢失.想要指定队列重回集群的唯一方法是恢复故障节点.*

    为什么默认情况下RabbitMQ不将队列内容和状态复制到所有节点:
    * 存储空间 如果每个集群节点都拥有所有队列的完全拷贝,那么新计入了节点不会给你带来更多存储空间
    * 性能 消息的发布需要将消息复制到每一个集群节点,对持久化消息来说,每一条消息都会触发磁盘IO.每次新节点添加,网络和磁盘负载都会增加

    通过设置集群中唯一节点来负责任何特定队列,只有该负责节点才会因队列消息而消耗磁盘IO.所有其他节点需要将接收到该队列的请求转发给该队列的所有者节点.因此,增加Rabbit集群节点意味着你将拥有更多的节点传播队列,这些新节点为你带来了性能提升
  3. 分布交换器

    1
    2
    3
    4
    交换器不同队列那样拥有自己的进程,交换器说到底只是一个名称和一个队列绑定列表
    当你将消息发布到交换器时,实际上是由你所连接到的信道将消息上的路由键同交换器的绑定列表进行比较,然后路由信息.正是信道按照绑定的匹配结果,将消息路由到队列
    理解信道是真正的路由器这一点很重要,这解释了为什么交换器不会像集群中的队列那样受到相同的限制
    由于交换器只不过是一张查询表,而非实际上的消息路由器,因此将交换器在整个集群中进行复制会更加简单.
  4. 是内存节点,还是磁盘节点

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    每个RabbitMQ节点,不管是单一节点还是集群中的一部分,要么是内存节点,要么是磁盘节点
    * 内存节点将所有的队列,交换器,绑定,用户,权限和vhost的元数据定义都仅存储在内存中
    * 磁盘节点则将元数据存储在磁盘
    *注意:单节点只允许磁盘类型节点,否则你每次重启RabbitMQ,所有关于系统的配置信息都会丢失*
    在集群中,你可以选择配置部分节点为内存节点,因为它使得向队列和交换器声明之类的操作更加快速
    *注意: 当在集群中声明队列,交换器或绑定时,这些操作直到所有集群节点都成功提交元数据变更后才返回。对内存节点来说,将变更写入内存;对磁盘节点来说,将变更写入磁盘,直到完成之后*

    RabbitMQ只要求在集群中至少有一个磁盘节点,所有其他节点都可以是内存节点.当节点加入和离开集群时,它们必须将该变更通知到至少一个磁盘节点
    如果集群只有一个磁盘节点,正好它又崩溃了,那么集群还可以路由消息,但不能操作:创建队列,创建交换器,创建绑定,添加用户,更改权限,添加或删除集群节点
    解决方法: 建议在集群中设置两台磁盘节点
    *注意: 只有一个需要所有的磁盘节点必须在线的操作是添加或者删除集群节点*

    当重启内存节点后,它们会连接到预先配置的磁盘节点,下载当前集群元数据拷贝.如果你只讲两个磁盘节点中的一个高速了内存节点,不凑巧这个磁盘节点崩溃了,内存节点重启后就无法找到集群了.所以添加内存节点时,确保告知其所有的磁盘节点.只要内存节点可以找到至少一个磁盘节点,那么它就能在重启后重新加入集群
  5. 建立集群

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    集群一: 单台集群启动三个Rabbit服务
    1. 启动三个Rabbit服务
    RABBITMQ_NODE_PORT=5671 RABBITMQ_NODENAME=rabbit_1 rabbitmq-server -detached
    RABBITMQ_NODE_PORT=5672 RABBITMQ_NODENAME=rabbit_2 rabbitmq-server -detached
    RABBITMQ_NODE_PORT=5673 RABBITMQ_NODENAME=rabbit_3 rabbitmq-server -detached
    2. 停止第二个节点上的RabbitMQ程序
    rabbitmqctl -n rabbit_2@localhost stop_app
    3. 重新设置第二个节点的元数据和状态为清空状态
    rabbitmqctl -n rabbit_2@localhost reset
    4. 将第二个节点加入到第一个集群节点
    rabbitmqctl -n rabbit_2@localhost cluster rabbit_1@localhost rabbit_2@localhost
    * -n rabbit_2@localhost 表示你想在指定的节点上执行命令
    * cluster rabbit_1@localhost rabbit_2@localhost 表示rabbit_1和将要加入的rabbit_2都是磁盘节点
    5. 重新启动第二个节点
    rabbitmqctl -n rabbit_2@localhost start_app
    6. 将第三个节点加入到集群
    rabbitmqctl -n rabbit_3@localhost stop_app
    rabbitmqctl -n rabbit_3@localhost reset
    rabbitmqctl -n rabbit_3@localhost cluster rabbit_1@localhost rabbit_2@localhost
    rabbitmqctl -n rabbit_3@localhost start_app
    *注意: 将第三个节点加入到集群时,cluster后面并没有包含rabbit_3,表示为rabbit_3指定了两个磁盘节点rabbit_2和rabbit_1,rabbit_3将会成为内存节点*
    7. 查看集群状态
    rabbitmqctl cluster_status
    查看nodes部分:
    * disc 表示磁盘节点信息
    * ram 表示内存节点信息
    * running_nodes 告诉你集群中的哪些节点正在运行
    8. 连接到RabbitMQ
    你可以连接到Running_nodes中任何一个节点,开始创建队列,发布消息,或者执行其他AMQP命令

    集群二: 将节点分布到更多机器上(三台机器为例)
    1. 在三台机器上启动RabbitMQ
    RABBITMQ_NODE_PORT=5671 RABBITMQ_NODENAME=rabbit_1 rabbitmq-server -detached
    RABBITMQ_NODE_PORT=5672 RABBITMQ_NODENAME=rabbit_2 rabbitmq-server -detached
    RABBITMQ_NODE_PORT=5673 RABBITMQ_NODENAME=rabbit_3 rabbitmq-server -detached
    2. 将其中一台的RabbitMQ的Erlang cookie复制到其他几个节点
    /var/lib/rabbitmq/.erlang.cookie
    复制cookie内的字符串,并粘贴到其他两个节点的/var/lib/rabbitmq/.erlang.cookie中
    重启其他另外两个节点
    rabbitmqctl stop
    rabbitmqctl start
    3. 停止第二个节点上的RabbitMQ程序
    rabbitmqctl -n rabbit_2@localhost stop_app
    4. 重新设置第二个节点的元数据和状态为清空状态
    rabbitmqctl -n rabbit_2@localhost reset
    5. 将第二个节点加入到第一个集群节点
    rabbitmqctl -n rabbit_2@localhost cluster rabbit_1@localhost rabbit_2@localhost
    * -n rabbit_2@localhost 表示你想在指定的节点上执行命令
    * cluster rabbit_1@localhost rabbit_2@localhost 表示rabbit_1和将要加入的rabbit_2都是磁盘节点
    6. 重新启动第二个节点
    rabbitmqctl -n rabbit_2@localhost start_app
    7. 将第三个节点加入到集群
    rabbitmqctl -n rabbit_3@localhost stop_app
    rabbitmqctl -n rabbit_3@localhost reset
    rabbitmqctl -n rabbit_3@localhost cluster rabbit_1@localhost rabbit_2@localhost
    rabbitmqctl -n rabbit_3@localhost start_app
    *注意: 将第三个节点加入到集群时,cluster后面并没有包含rabbit_3,表示为rabbit_3指定了两个磁盘节点rabbit_2和rabbit_1,rabbit_3将会成为内存节点*
    8. 查看集群状态
    rabbitmqctl cluster_status
    查看nodes部分:
    * disc 表示磁盘节点信息
    * ram 表示内存节点信息
    * running_nodes 告诉你集群中的哪些节点正在运行
    9. 连接到RabbitMQ
    你可以连接到Running_nodes中任何一个节点,开始创建队列,发布消息,或者执行其他AMQP命令

    集群三:将节点从集群中移除
    1. 找到你需要停止的集群节点
    2. 移除
    rabbitmqctl -n rabbit_3@localhost stop_app
    rabbitmqctl -n rabbit_3@localhost reset
    3. 验证
    方法一: 启动移除的节点
    rabbitmqctl -n rabbit_3@localhost start_app
    rabbitmqctl -n rabbit_3@localhost cluster_status
    方法二: 查看已有集群的状态
    rabbitmqctl -n rabbit_1@localhost cluster_status
  6. 升级版本

    1
    2
    3
    4
    5
    6
    7
    8
    9
    在独立的系统中升级新版本RabbitMQ比较容易: 解压新版本,然后停服,替换新版本,启动服务.

    集群升级版本是半自动化的,不能简单的将新版本在集群节点上解压并重启服务:
    1. 通过RabbitMQ Management 插件备份当前配置
    2. 关闭所有生产者并等待消费者消费完队列中的所有消息
    3. 关闭节点,并解压新版本RabbitMQ到安装目录
    4. 选择其中一个磁盘节点作为升级节点,当启动时,该节点会将持久化的集群数据升级到新版本
    5. 然后启动其他集群的磁盘节点,它们会获取升级后的集群数据
    6. 最后启动集群的内存节点
  7. 镜像队列和保留消息

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    RabbitMQ集群默认情况下队列只存活于集群中的一个节点上.在RabbitMQ 2.6.0版本之后,Rabbit增加了内建的双活冗余选项: 镜像队列
    镜像队列: 像普通队列一样,镜像队列的主拷贝仅存在于一个节点上,但与普通队列不同的是,镜像节点在集群中的其他节点上拥有从队列拷贝.一旦队列的主节点不可用,最老的从队列将被选举为新的主队列

    例如:
    queue_args = {"x-ha-policy":"all"}
    channel.queue_declare(queue="hello-queue",arguments=queue_args)

    由于队列的镜像性质是由应用程序在允许时指定的,RabbitMQ团队决定你可以同样的方式在允许时指定队列需要镜像到哪些节点.
    指定将镜像放在哪个节点:
    queue_args = {"x-ha-policy":"nodes","x-ha-policy-params":["rabbit@localhost"]}
    channel.queue_declare(queue="hello-queue",arguments=queue_args)

    *注意: 新增的从拷贝只会包含那些其添加进来之后从镜像队列发来的消息.RabbitMQ不会将镜像队列现已经存在的内容与新添加的从拷贝进行同步. 新增的从拷贝最终会和现存的队列拷贝拥有相同的状态.*

RabbitMQ从故障中恢复

1
2
3
4
5
1.架构的规划
haproxy
RabbitMQ集群
2.业务代码的调整
错误检测和重新链接

RabbitMQ user and grant

RabbitMQ 用户管理

1
2
3
4
5
6
7
8
9
* 创建用户
rabbitmqctl add username password
* 删除用户
rabbitmqctl delete username
*注意:会删除用户和访问策略!!!*
* 修改用户密码
rabbitmqctl change_password username new-password
* 查看用户列表
rabbitmqctl list_users

RabbitMQ 权限管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
1. 权限种类
* 读 有关消费消息的任何操作,包括"清除"整个队列(绑定操作的成功)
1. queue.bind:
配置:exchange
2. basic.get
配置: queue
3. basic.consume
配置: queue
4. queue.purge
配置: queue
* 写 发布消息(绑定操作的成功)
1. queue.bind
配置: queue
2. basic.publish
配置: exchange
* 配置 队列和交换器的创建和删除
1. exchange.declare
配置: exchange
2. exchange.delete
配置: exchange
3. queue.declare
配置: queue
4. queue.delete
配置: queue
2. 每条访问规则组成
* 被授权访问的用户
* 权限控制应用的vhost
* 需要授予读/写/配置权限组合
* 权限范围(仅应用于客户端命名队列/交换器;仅用于服务端命名队列/交换器;两者兼备)
*注意: 访问控制条目是无法跨越vhost的*
3. 授权案例
* 完全访问控制
rabbitmqctl set_permissions -p vhost-name username ".\*" ".\*" ".\*"
* 第一个".\*"表示配置的权限
* 第二个".\*"表示写的权限
* 第三个".\*"表示读的权限
* 组合使用
rabbitmqctl set_permissions -p vhost-name -s all username "" "check-.\*" ".\*"
* 授权在vhost-name上
* -s all 表示任何队列或者交换器
* "" 表示不匹配队列和交换器
* "check-.\*" 表示只匹配以"check-"开头的队列和交换器
* 查看访问策略
rabbitmqctl list_permissions -p vhost-name
* 移除一个用户在vhost上的权限
rabbitmqctl clear_permissions -p vhost-name username
* 查看用户在所有vhost上的权限
rabbitmqctl list_user_permissions username

RabbitMQ 查看统计

1
2
3
4
5
6
7
8
9
10
1. 列出队列和消息数目
* rabbitmqctl list_queues
* rabbitmqctl list_queues -p vhost-name
* rabbitmqctl list_queues -p vhost-name name messages consumers memory
* rabbitmqctl list_queues -p vhost-name name durable auto_delete
2. 查看交换器和绑定
* rabbitmqctl list_exchanges
* rabbitmqctl list_exchanges name type durable auto_delete

* rabbitmqctl list_bindings

RabbitMQ 日志

1
2
3
4
5
6
7
8
1. 两个日志文件
* RABBITMQ_NODENAME-sasl.log
sasl是库的集合,作为Erlang-OPT发行版一部分,帮助开发者在开发应用程序时,将日志写入rabbit-sasl.log文件
可以找到Erlang的崩溃报告
* RABBITMQ_NODENAME.log
配置系统日志
2. 轮询日志
rabbitmqctl rotate_logs .1

RabbitMQ info

RabbitMQ 简介

  1. AMQP是什么?

    1
    2
    AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计.
    基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制.
  2. AMQP的范围:

    1
    2
    3
    4
    为了完全实现消息中间件的互操作性,需要充分定义*网络协议*和*消息代理服务*的功能语义
    因此,AMQP定义网络协议和代理服务如下:
    * 一套确定的消息交换功能,也就是"高级消息交换协议模型".AMQP模型包括一套用于路由和存储消息的功能模块,以及一套在这些模块之间交换消息的规则.
    * 一个网络线级协议(数据传输格式),客户端应用可以通过这个协议与消息代理和它实现的AMQP模型进行交互通信.
  3. AMQP模型

    1
    2
    3
    4
    5
    6
    在服务器中,三个主要功能模块连接成一个处理链完成预期的功能:
    * "exchange"接收发布应用程序发送的消息,并根据一定的规则将这些消息路由到"消息队列"
    * "message queue"存储消息,直到这些消息被消费者安全处理完为止
    * "binding"定义了exchange 和 message queue 之间的关联,提供路由规则

    一个AMQP服务器类似于邮件服务器,exchange类似于消息传输代理(email里的概念),message queue类似于邮箱.Binding定义了每一个传输代理中的消息路由表,发布者将消息发给特定的传输代理,然后传输代理将这些消息路由到邮箱中,消费者从这些邮箱中取出消息.
  4. AMQP 消息交换体系结构

    1
    2
    3
    4
    5
    6
    * 存储转发(多个消息发送者,单个消息接收者)
    * 分布式事务(多个消息发送者,多个消息接收者)
    * 发布订阅(多个消息发送者,多个消息接收者)
    * 基于内容的路由(多个消息发送者,多个消息接收者)
    * 文件传输队列(多个消息发送者,多个消息接收者)
    * 点对点连接(单个消息发送者,单个消息接收者)
  5. 理解消息通信

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    171
    172
    173
    174
    175
    176
    177
    178
    179
    180
    181
    182
    183
    184
    185
    186
    187
    188
    189
    190
    191
    192
    193
    194
    195
    196
    197
    198
    199
    200
    201
    202
    203
    204
    205
    206
    207
    208
    209
    210
    211
    212
    213
    214
    215
    216
    217
    218
    219
    220
    221
    222
    223
    224
    225
    226
    227
    1. 生产者和消费者
    RabbitMQ在应用程序和服务器之间扮演者路由器的角色.所以当应用程序连接RabbitMQ时,它会决定是发送还是接收
    1. 生产者生产消息,然后发布到代理服务器(RabbitMQ).
    消息分为两部分:
    * 有效载荷:你要传输的数据
    * 标签:它描述了有效载荷,并且RabbitMQ用它来决定谁将获得消息的拷贝
    2. 消费者消费消息,先连接到代理服务器(RabbitMQ),并订阅到队列上
    当消费者接收到消息时,它只得到消息的一部分:有效载荷.在消息的路由过程中,消息的标签并没有随有效载荷一起传递
    生产者创建消息,消费者接收消息.在发布消息或接收消息之前必须先连接RabbitMQ,建立一条信道(channel)
    2. 队列
    AMPQ消息路由必须包含三部分:
    * 路由器: 生产者把消息发送到路由器上
    * 队列: 消息最终到达队列,并被消费者接收
    * 绑定: 决定了消息如何从路由器路由到特定的队列上

    消费者通过两种方式从特定的队列中接收消息:
    * 通过AMQP的base.consume命令订阅(持续订阅)
    * 通过AMQP的base.get命令(单条订阅)
    当至少一个消费者订阅了队列, 消息会立即发送给这些订阅的消费者;如果消息到达了无人订阅的队列,消息会在队列中等待,一旦有消费者订阅该队列,那么队列上的消息会立即发送给消费者.当有多个消费者订阅到同一个队列时,队列收到的消息将以循环的方式发送给消费者,**每条消息只会发送给一个订阅消费者**.
    例如:消费者A和B都订阅了队列send
    1. Message_A发送到队列send
    2. RabbitMQ将消息Message_A发送给消费者A
    3. 消费者A确认收到了消息Message_A
    4. RabbitMQ将消息Message_A从队列send中删除
    5. Message_B发送到队列send
    6. RabbitMQ将消息Message_B发送给消费者B
    7. 消费者B确认收到了消息Message_B
    8. RabbitMQ将消息Message_B从队列send中删除
    注意:
    * 消费者接收到的每一条消息都必须进行确认(通过AMQP的base.ack命令或在订阅队列的时候将auto_ack设置为true).
    * 消费者对消息的确认和告诉生产者消息已经被接收了这两件事情毫无相关.因此,消费者通过确认命令告诉RabbitMQ它已经正确的接收到了消息,同时RabbitMQ才能安全的把消息从队列中移除
    * 如果消费者接收一条消息,然后确认之前从Rabbit处断开连接,RabbitMQ会认为这条消息没有分发,它会从新分发给下一个订阅的消费者
    * 如果你的程序崩溃了,这样做一方面保证消息会被发送给下一个消费者处理;另一方面,如果应用程序有Bug而忘记确认消息,RabbitMQ不会给该消费者发送更多消息,这是因为在上一条消息被确认之前,RabbitMQ会认为这个消费者还没有准备好接收下一条消息

    在收到消息后,如果你想明确拒绝而不是确认收到该消息,你有两个选择:
    * 把消费者从RabbitMQ服务器断开连接,这会导致RabbitMQ会重新将消息发送给另一个消费者
    优点:所有RabbitMQ都支持
    缺点:这样连接/断开的连接方式会增加RabbitMQ的负担
    * RabbitMQ >= 2.0.0版本,使用AMPQ的base.reject命令
    如果把reject的requeue参数设置为true,RabbitMQ会把消息发送给下一个消费者
    如果把reject的requeue参数设置为false,RabbitMQ会立即把消息从队列中移除,而不会把它发送给新的消费者

    创建队列:
    生产者和消费者都能使用AMPQ的queue.declare命令创建队列
    如果消费者在同一条信道上订阅了另一个队列,就无法声明队列了.必须首先取消订阅,将信道设置为"传输"模式
    创建队列时需要指定队列名称,消费者订阅时需要队列名称,在创建绑定时也需要队列名称.如果在创建队列时没有指定队列名称,RabbitMQ会分配一个随机名称并在queue.declare命令的响应中返回
    创建队列有用参数:
    * exclusive
    设置true,队列将变为私有的,此时只有你的应用程序才能够消费队列消息.当你限制一个队列只有一个消费者时有用
    * auto-delete
    当最后一个消费者取消订阅时,队列会自动移除
    * 如果你只想检测队列是否存在,可设置queue.declare的passive选项为true.在该设置下,如果队列存在返回成功;如果队列不存在返回错误

    队列是AMPQ的基础模块:
    * 为消息提供了处所,消息在此等待消费
    * 对负载均衡来说,队列是绝佳方案.只需附加一堆消费者,并让RabbitMQ循环的方式均匀分发消息
    * 队列是RabbitMQ消息的最终点(除非消息进入了黑洞)

    3. 交换器和绑定
    将你想将消息投递给队列时,你通过把消息发送给交换器来完成.然后,根据确定的规则,RabbitMQ会决定消息该投递到哪个队列.这些规则成为路由键
    **队列通过路由键来绑定到交换器.**
    当你把消息发送到代理服务器时,消息将有一个路由键(即便是空的),RabbitMQ也会将其和绑定的路由键进行匹配.如果匹配成功,那么消息将投递到该队列;如果匹配不成功,消息将进入黑洞

    服务器会根据路由键将消息从交换器路由到队列,如何处理投递多个队列呢?
    协议中定义的不通类型交换器发挥了作用:
    * direct
    * fanout
    * topic
    * headers
    每一类实现了不通的路由算法.

    1. headers交换器允许你匹配AMQP消息的header而非路由键,除此之外,headers交换器和direct交换器完全一致,但性能会差很多
    2. direct
    **如果路由键匹配,消息就会被投递到对应的队列**
    服务器必须实现direct类型交换器,包含一个空白字符串名称的默认交换器
    当声明一个队列时,它会自动绑定到默认交换器,并以队列名称作为路由键.
    例如: $channel->baseic_publish($msg,'','queue-name')
    第一个参数表示你想发送的内容
    第二个参数为空,表示指定了默认的交换器
    第三个参数就是路由键,即队列名字
    当默认的direct交换器无法满足应用程序的需求时,你可以声明自己的交换器.只需发送exchange.declare命令并设置合适的参数
    3. fanout
    **会将收到的消息广播到绑定的队列上**
    当你发送一条消息到fanout交换器时,它会把消息投递给所有附加在此交换机的队列
    4. topic
    **允许来自不同源头的消息能够到达同一个队列**

    声明队列msg-inbox-errors,并将其绑定到交换器上
    $channel->queue_bind('msg-inbox-errors','logs-exchange','error.msg-inbox')
    error.msg-inbox表示绑定规则
    4. 虚拟主机和隔离
    每一个RabbitMQ服务器都能创建虚拟消息服务器,我们称为虚拟主机vhost.
    每一个vhost本质上都一个mini版的RabbitMQ服务器,拥有自己的队列,交换器和绑定,更重要的是拥有自己的权限机制
    vhost之于RabbitMQ就像虚拟机之于物理服务器一样:它们通过在各个实例间提供逻辑上的分离,允许你为不同应用程序安全保密的运行数据
    vhost是AMQP概念的基础,你必须在连接时进行制定.默认的vhost:"/"

    AMQP它并没有指定权限控制是在vhost级别还是在服务器端级别实现
    当你在RabbitMQ里面创建一个用户时,用户通常会被指派至少一个vhost,并且只能访问被指派vhost内的队列,交换器和绑定
    注意:vhost之间是绝对隔离的
    vhost和权限控制非常独特,它们是AMQP中唯一无法通过AMQP协议创建的基元.需要通过rabbitmqctl工具创建
    例如:
    * rabbitmqctl add_vhost vhost-name
    * rabbitmqctl delete_vhost vhost-name
    * rabbitmqctl list_vhost
    5. 持久化和你的策略
    默认情况下,重启RabbitMQ服务器后,那些队列和交换器就消失了.原因在于每个队列和交换器的durable属性,该属性为false,它决定了RabbitMQ是否在崩溃或者重启之后重建队列.该属性为true,你就不需要在服务器断电重启后重新创建队列和交换器了

    在发布消息之前,通过把它的"投递模式"(delivery mode)选项设置为2来把消息标记成持久化.到目前为止,消息还只是被表示为持久化的,但是它还必须被发布到持久化交换器中并到达持久化队列中才行.
    如果消息想从RabbitMQ崩溃中恢复,那么消息必须:
    * 把它的投递模式设置为2(持久)
    * 发送到持久化交换器
    * 到达持久化队列

    RabbitMQ确保持久性消息能从服务器重启中恢复的方式是,将它们写入磁盘上的一个持久化日志文件.当发布一条持久性消息到交换器时,RabbitMQ会在消息提交到日志文件后才发送响应.记住,之后这条消息如果路由到了非持久队列,它会自动从持久性日志中移除,并且无法从服务器重启中恢复.如果你使用持久性消息,则确保之前提到的持久性消息的那三点必须做到位.一旦你从持久化队列中消费了一条持久性消息,RabbitMQ会在持久化日志中把这条消息标记为等待垃圾收集.在你消费持久化消息之前,如果RabbitMQ重启的话,服务器会自动重建交换器和队列,绑定,重播持久性日志文件中消息到合适的队列或者交换器上

    持久化消息能保证RabbitMQ重启后数据的恢复,同时你也要为此付出性能代价.

    和持久化相关的一个概念是AMQP事务.
    在AMQP中,在把信道设置成事务模式后,你通过信道发送那些想要确认的消息,之后还有多个其他AMQP命令.这些命令是执行还是忽略,取决于第一条消息发送是否成功.一旦你发送完所有命令,就可以提交事务了.
    事务填补了生产者发布消息以及RabbitMQ将它们提交到磁盘上这两者之间"最有1英里"的差距

    RabbitMQ事务消耗RabbitMQ性能,RabbitMQ团队拿出最好的方案来保证消息投递:发送方确认模式
    你需要告诉RabbitMQ将信道设置成confirm模式,而且你只能通过重新创建信道来关闭该设置.一旦信道进入confirm模式,所有在信道上发布的消息都会被指派一个唯一的ID号.一旦消息被投递给所有匹配的队列后,信道会发送一个发送方确认模式给生产者应用程序.这使得生产者知晓消息已经安全到达队列了.如果消息和队列是可持久化的,那么确认消息只会在队列将消息写入磁盘后才会发出.发送方确认模式的最大好处是它们是异步的.一旦发布了一条消息,生产者应用程序就可以在等待确认的同时继续发送下一条.当确认消息最终收到的时候,生产者应用的回调方法就会被触发来处理该确认消息.如果RabbitMQ内部错误从而导致了消息的丢失,RabbitMQ会发送一条nack消息.就像发送方确认消息那样,只不过这次说明的消息已经丢失了.同时,由于没有消息回滚的概念,因此发送方确认模式更加轻量级,同时对RabbitMQ代理服务器的性能影响几乎可以忽略不计
    6. 一条消息的一生
    发布者:
    * 连接到RabbitMQ
    * 获取信道
    * 声明交换器
    * 创建消息
    * 发布消息
    * 关闭信道
    * 关闭连接

    脚本:
    esay_install pika
    # -*- coding:utf-8 -*-
    import pika,sys
    # 建立连接
    credentials = pika.PlainCredentials("guest","guest")
    conn_params = pika.ConnectionParameters("localhost",credentials = credentials)
    conn_broker = pika.BlockingConnection(conn_params)
    # 获取信道
    channel = conn_broker.channel()
    # 声明交换器
    channel.exchange_declare(exchange="hello-exchange",type="direct",passive=False,durable=True,auto_delete=False)
    # 创建消息
    msg = sys.argv[1]
    msg_props = pika.baseicProperties()
    msg_props.content_type = "text/plain"
    # 发布消息
    channel.basic_publish(body=msg,exchange="hello-exchange",properties=msg_props,routing_key="hola")

    消费者:
    * 连接到RabbitMQ
    * 获取信道
    * 声明交换器
    * 声明队列
    * 把队列和交换器绑定起来
    * 消费消息
    * 关闭信道
    * 关闭连接

    脚本:
    esay_install pika
    # -*- coding:utf-8 -*-
    import pika,sys
    # 建立连接
    credentials = pika.PlainCredentials("guest","guest")
    conn_params = pika.ConnectionParameters("localhost",credentials = credentials)
    conn_broker = pika.BlockingConnection(conn_params)
    # 获取信道
    channel = conn_broker.channel()
    # 声明交换器
    channel.exchange_declare(exchange="hello-exchange",type="direct",passive=False,durable=True,auto_delete=False)
    # 声明队列
    channel.queue_declare(queue="hello-queue")
    # 通过routing键将队列和交换器绑定起来
    channel.queue_bind(queue="hello-queue",exchange="hello-exchange",routing_key="hola")
    # 定义消息处理函数
    def msg_consumer(channel,method,header,body):
    # 消息确认
    channel.basic_ack(delivery_tag=method.delivery_tag)
    # 停止消费并退出
    if body == "quit":
    channel.basic_cancel(consumer_tag="hello-consumer")
    channel.stop_consuming()
    else:
    print body
    return
    # 订阅消费者
    channel.basic_consume(msg_consumer,queue="hello-queue",consumer_tag="hello-consumer")
    channel.start_consuming()

    7. 使用发送方确认模式来确认投递
    带有确认功能的生产者

    # -*- coding:utf-8 -*-
    import pika,sys
    from pika import spec

    credentials = pika.PlainCredentials("guest","guest")
    conn_params = pika.ConnectionParameters("localhost",credentials = credentials)
    conn_broker = pika.BlockingConnection(conn_params)

    channel = conn_broker.channel()
    #发送方确认模式处理器
    def confirm_handler(frame):
    if type(frame.method) == spec.Confirm.SelectOK:
    print "Channel in 'confirm' mode."
    elif type(frame.method) == spec.Basic.Nack:
    if frame.method.delivery_tag in msg_ids:
    print "Message lost!"
    elif type(frame.method) == spec.Basic.Ack:
    if frame.method.delivery_tag in msg_ids:
    print "Confirm received!"
    msg_ids.remove(frame.method.delivery_tag)
    # 将信道设置为confirm模式
    channel.confirm_delivery(callback=confirm_handler)
    # 重设消息ID追踪器
    msg = sys.argv[1]
    msg_props = pika.baseicProperties()
    msg_props.content_type = "text/plain"
    msg_ids = []
    # 发布消息
    channel.basic_publish(body=msg,exchange="hello-exchange",properties=msg_props,routing_key="hola")
    msg_ids.append(len(msg_ids) + 1)
    channel.close()

flume info

flume 简介

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
Flume NG是什么?
Flume NG是一个分布式,高可用,可靠的系统,它能将不同的海量数据收集,移动并存储到一个数据存储系统中.
轻量,配置简单,适用于各种日志收集,并支持Failover和负载均衡.并且它拥有非常丰富的组件.

Flume流程:
Events(Client => Agent(source => channel => sink))

Flume核心概念:
1.Event:一个数据单元,带有一个可选的消息头
2.Flow:Event从源点到达目的点的迁移的抽象
3.Client:操作位于源点处的Event,将其发送到Flume Agent
4.Agent:一个独立的Flume进程,包含组件Source、Channel、Sink
5.Source:用来消费传递到该组件的Event
6.Channel:中转Event的一个临时存储,保存有Source组件传递过来的Event
7.Sink:从Channel中读取并移除Event,将Event传递到Flow Pipeline中的下一个Agent(如果有的话)

Flume原理:
Flume的核心是把数据从数据源收集过来,再送到目的地.为了保证输送一定成功,在送到目的地之前,会先缓存数据,待数据真正到达目的地后,删除自己缓存的数据
Flume传输的数据的基本单位是Event,如果是文本文件,通常是一行记录,这也是事务的基本单位.Event从Source流向Channel,再到Sink,本身为一个byte 数组,并可携带headers信息.Event代表着一个数据流的最小完整单元,从外部数据源来,向外部的目的地去
Flume运行的核心是Agent.它是一个完整的数据收集工具,含有三个核心组件,分别是source,channel,sink.通过这些组件,Event 可以从一个地方流向另一个地方.

Flume Agent三大组件:
1.source
可以接收外部源发送过来的数据.不同的source,可以接受不同的数据格式.
比如:有目录池(spooling directory)数据源,可以监控指定文件夹中的新文件变化,如果目录中有文件产生,就会立刻读取其内容
2.channel
是一个存储地,接收source的输出,直到有sink消费掉channel中的数据
channel中的数据直到进入到下一个channel中或者进入终端才会被删除.当sink写入失败后,可以自动重启,不会造成数据丢失,因此很可靠.
3.sink会消费channel中的数据,然后送给外部源或者其他source.如数据可以写入到HDFS或者HBase中

  1. flume 配置文件示例一

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    penn@ubuntu:/mnt/app/flume$ cat conf/test.conf
    # name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1

    # source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 4444

    # channel
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100

    # sink
    a1.sinks.k1.type = logger

    # bind source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1

    penn@ubuntu:/mnt/app/flume$ ./bin/flume-ng agent --conf ./conf --conf-file ./conf/test.conf --name a1 -Dflume.root.logger=INFO,console
  2. flume 配置文件示例二

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    1.source instance can specify multiple channels, but a sink instance can only specify one channel. The format is as follows:
    # list the sources, sinks and channels for the agent
    <Agent>.sources = <Source>
    <Agent>.sinks = <Sink>
    <Agent>.channels = <Channel1> <Channel2>

    # set channel for source
    <Agent>.sources.<Source>.channels = <Channel1> <Channel2> ...

    # set channel for sink
    <Agent>.sinks.<Sink>.channel = <Channel1>

    2.Adding multiple flows in an agent
    <Agent>.sources = <Source1> <Source2>
    <Agent>.sinks = <Sink1> <Sink2>
    <Agent>.channels = <Channel1> <Channel2>

    3.In the replicating flow, the event is sent to all the configured channels. In case of multiplexing, the event is sent to only a subset of qualifying channels.Once all required channels have consumed the events, then the selector will attempt to write to the optional channels.
    Note that if a header does not have any required channels, then the event will be written to the default channels and will be attempted to be written to the optional channels for that header. Specifying optional channels will still cause the event to be written to the default channels, if no required channels are specified. If no channels are designated as default and there are no required, the selector will attempt to write the events to the optional channels. Any failures are simply ignored in that case.

    # list the sources, sinks and channels in the agent
    agent_foo.sources = avro-AppSrv-source1
    agent_foo.sinks = hdfs-Cluster1-sink1 avro-forward-sink2
    agent_foo.channels = mem-channel-1 file-channel-2

    # set channels for source
    agent_foo.sources.avro-AppSrv-source1.channels = mem-channel-1 file-channel-2

    # set channel for sinks
    agent_foo.sinks.hdfs-Cluster1-sink1.channel = mem-channel-1
    agent_foo.sinks.avro-forward-sink2.channel = file-channel-2

    # channel selector configuration
    agent_foo.sources.avro-AppSrv-source1.selector.type = multiplexing
    agent_foo.sources.avro-AppSrv-source1.selector.header = State
    agent_foo.sources.avro-AppSrv-source1.selector.mapping.CA = mem-channel-1
    agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2
    agent_foo.sources.avro-AppSrv-source1.selector.mapping.NY = mem-channel-1 file-channel-2
    agent_foo.sources.avro-AppSrv-source1.selector.optional.CA = mem-channel-1 file-channel-2
    agent_foo.sources.avro-AppSrv-source1.selector.default = mem-channel-1

beats install

beats

1
2
3
4
5
6
7
8
9
10
11
12
13
Beats:
Lightweight Data Shippers
Beats is the platform for single-purpose data shippers. They install as lightweight agents and send data from hundreds or thousands of machines to Logstash or Elasticsearch.

The Beats Family:
1.Filebeat
Forget using SSH when you have tens, hundreds, or even thousands of servers, virtual machines, and containers generating logs. Filebeat helps you keep the simple things simple by offering a lightweight way to forward and centralize logs and files.
2.Metricsbeat
Collect metrics from your systems and services. From CPU to memory, Redis to Nginx, and much more, Metricbeat is a lightweight way to send system statistics.
3.Packetbeat
Know what’s going on across your applications by tapping into data traveling over the wire. Packetbeat is a lightweight network packet analyzer that sends data to Logstash or Elasticsearch.
4.Winlogbeat
Keep a pulse on what’s happening across your Windows-based infrastructure. Winlogbeat live streams Windows event logs to Elasticsearch and Logstash in a lightweight way.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
例子:
filebeat.yml:
filebeat.prospectors:
- input_type: log
paths:
- /var/log/apache/httpd-*.log
document_type: apache

- input_type: log
paths:
- /var/log/messages
- /var/log/*.log

output.logstash:
hosts: ["localhost:5044"]
index: filebeat

./filebeat -c filebeat.yml -configtest
./filebeat -c filebeat.yml


Logstash loadbalance: https://www.elastic.co/guide/en/beats/filebeat/current/load-balancing.html
output.logstash:
hosts: ["localhost:5044", "localhost:5045"]
loadbalance: true
worker: 2

influxdb install

influxdb 简介

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
InfluxDB是用Go语言编写的一个开源分布式时序,事件和指标数据库,无需外部依赖.类似的数据库有Elasticsearch,Graphite等.

特色:
1.基于时间序列,支持与时间有关的相关函数(如最大,最小,求和等)
2.可度量性:你可以实时对大量数据进行计算
3.基于事件:它支持任意的事件数据

特点:
1.无结构(无模式):可以是任意数量的列
2.可拓展的
3.支持min, max, sum, count, mean, median 等一系列函数,方便统计
4.原生的HTTP支持,内置HTTP API
5.强大的类SQL语法
6.自带管理界面,方便使用

By default, InfluxDB uses the following network ports:
TCP port 8083 is used for InfluxDB’s Admin panel
TCP port 8086 is used for client-server communication over InfluxDB’s HTTP API
  1. influxdb 安装

    1
    2
    3
    4
    [root@xxx ~]# wget https://dl.influxdata.com/influxdb/releases/influxdb-1.1.1_linux_amd64.tar.gz
    [root@xxx ~]# tar xzf influxdb-1.1.1_linux_amd64.tar.gz
    [root@xxx ~]# mv influxdb-1.1.1-1 /mnt/app/influxdb
    [root@xxx ~]# mkdir -p /mnt/data/influxdb/{meta,data,hh,wal}
  2. influxdb 环境变量设置

    1
    2
    3
    4
    [root@xxx ~]# echo 'INFLUX_PATH=/mnt/app/influxdb'     |tee /etc/profile.d/influx.sh
    [root@xxx ~]# echo 'INFLUX_BIN=${INFLUX_PATH}/usr/bin' |tee -a /etc/profile.d/influx.sh
    [root@xxx ~]# echo 'export PATH=${INFLUX_BIN}:$PATH' |tee -a /etc/profile.d/influx.sh
    [root@xxx ~]# source /etc/profile
  3. influxdb 配置

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    [root@xxx ~]# vim /mnt/app/influxdb/etc/influxdb/influxdb.conf
    [admin]
    enabled = true
    bind-address = ":8083"
    [http]
    bind-address = ":8086"
    [meta]
    dir = "/mnt/data/influxdb/meta"
    [data]
    dir = "/mnt/data/influxdb/data"
    wal-dir = "/mnt/data/influxdb/wal"
    [[udp]]
    bind-address = ":8089"
  4. influxdb 启动

    1
    2
    3
    [root@ip-172-31-90-45 ~]# /mnt/app/influxdb/usr/bin/influxd -config /mnt/app/influxdb/etc/influxdb/influxdb.conf
    or:
    [root@ip-172-31-90-45 ~]# /mnt/app/influxdb/usr/bin/influxd run -config /mnt/app/influxdb/etc/influxdb/influxdb.conf -pidfile /mnt/app/influxdb/run/influxdb.pid -cpuprofile /mnt/app/influxdb/run/influxdb.cpu -memprofile /mnt/app/influxdb/run/influxdb.memory &
  5. influxdb通过浏览器访问

    1
    http://{IP}:8083
  6. grafana install

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    [root@xxx ~]# tar xzf grafana-4.1.1-1484211277.linux-x64.tar.gz
    [root@xxx ~]# mv grafana-4.1.1-1484211277 /mnt/app/grafana
    [root@xxx ~]# cp /mnt/app/grafana/conf/{sample.ini,custom.ini}

    [root@xxx srv]# vim /mnt/app/grafana/conf/custom.ini
    [root@xxx srv]# mkdir -p /mnt/data/grafana
    [root@xxx srv]# mkdir -p /mnt/log/grafana
    [root@xxx srv]# mkdir -p /mnt/app/grafana/plugins

    [root@xxx srv]# /mnt/app/grafana/bin/grafana-server -homepath /mnt/app/grafana/ -config /mnt/app/grafana/conf/custom.ini &

    用户和密码默认都是admin
    [root@xxx conf]# /mnt/app/grafana/bin/grafana-server -homepath /mnt/app/grafana -config /mnt/app/grafana/conf/custom.ini -pidfile /mnt/app/grafana/run/grafana.pid &

influxdb 配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
[root@xxx ~]# cat /mnt/app/influxdb/etc/influxdb/influxdb.conf
reporting-disabled = false
hostname = "172.31.90.45"
[meta]
dir = "/mnt/data/influxdb/meta"
retention-autocreate = true
logging-enabled = true
[data]
dir = "/mnt/data/influxdb/data"
wal-dir = "/mnt/data/influxdb/wal"
trace-logging-enabled = false
query-log-enabled = true
cache-max-memory-size = 1048576000
cache-snapshot-memory-size = 26214400
cache-snapshot-write-cold-duration = "10m"
compact-full-write-cold-duration = "4h"
max-series-per-database = 1000000
max-values-per-tag = 100000
[coordinator]
write-timeout = "10s"
max-concurrent-queries = 0
query-timeout = "0s"
log-queries-after = "0s"
max-select-point = 0
max-select-series = 0
max-select-buckets = 0
[retention]
enabled = true
check-interval = "30m"
[shard-precreation]
enabled = true
check-interval = "10m"
advance-period = "30m"
[monitor]
store-enabled = true
store-database = "_internal"
store-interval = "10s"
[admin]
enabled = true
bind-address = ":8083"
https-enabled = false
[http]
enabled = true
bind-address = ":8086"
auth-enabled = false
log-enabled = true
write-tracing = false
pprof-enabled = true
https-enabled = false
max-row-limit = 10000
max-connection-limit = 0
unix-socket-enabled = false
[subscriber]
enabled = true
http-timeout = "30s"
insecure-skip-verify = false
write-concurrency = 40
write-buffer-size = 1000
[[collectd]]
enabled = true
bind-address = ":25826"
database = "collectd"
retention-policy = ""
typesdb = "/mnt/data/collectd/types.db"
batch-size = 5000
batch-pending = 10
batch-timeout = "10s"
read-buffer = 0
[continuous_queries]
enabled = true
log-enabled = true
run-interval = "1s"
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
[root@xxx ~]# cat /mnt/app/influxdb/etc/influxdb/influxdb.conf |grep -v \#|grep -v ^$
reporting-disabled = false
hostname = "172.31.90.45"
[meta]
dir = "/mnt/data/influxdb/meta"
retention-autocreate = true
logging-enabled = true
[data]
dir = "/mnt/data/influxdb/data"
wal-dir = "/mnt/data/influxdb/wal"
trace-logging-enabled = false
query-log-enabled = true
cache-max-memory-size = 1048576000
cache-snapshot-memory-size = 26214400
cache-snapshot-write-cold-duration = "10m"
compact-full-write-cold-duration = "4h"
max-series-per-database = 1000000
max-values-per-tag = 100000
[coordinator]
write-timeout = "10s"
max-concurrent-queries = 0
query-timeout = "0s"
log-queries-after = "0s"
max-select-point = 0
max-select-series = 0
max-select-buckets = 0
[retention]
enabled = true
check-interval = "30m"
[shard-precreation]
enabled = true
check-interval = "10m"
advance-period = "30m"
[monitor]
store-enabled = true
store-database = "_internal"
store-interval = "10s"
[admin]
enabled = true
bind-address = ":8083"
https-enabled = false
[http]
enabled = true
bind-address = ":8086"
auth-enabled = false
log-enabled = true
write-tracing = false
pprof-enabled = true
https-enabled = false
max-row-limit = 10000
max-connection-limit = 0
unix-socket-enabled = false
[subscriber]
enabled = true
http-timeout = "30s"
insecure-skip-verify = false
write-concurrency = 40
write-buffer-size = 1000
[[collectd]]
enabled = true
bind-address = ":25826"
database = "collectd"
retention-policy = ""
typesdb = "/mnt/data/collectd/types.db"
batch-size = 5000
batch-pending = 10
batch-timeout = "10s"
read-buffer = 0
[continuous_queries]
enabled = true
log-enabled = true
run-interval = "1s"
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
[root@ip-172-31-90-45 ~]# cat /mnt/data/collectd/types.db
absolute value:ABSOLUTE:0:U
apache_bytes value:DERIVE:0:U
apache_connections value:GAUGE:0:65535
apache_idle_workers value:GAUGE:0:65535
apache_requests value:DERIVE:0:U
apache_scoreboard value:GAUGE:0:65535
ath_nodes value:GAUGE:0:65535
ath_stat value:DERIVE:0:U
backends value:GAUGE:0:65535
bitrate value:GAUGE:0:4294967295
blocked_clients value:GAUGE:0:U
bucket value:GAUGE:0:U
bytes value:GAUGE:0:U
cache_eviction value:DERIVE:0:U
cache_operation value:DERIVE:0:U
cache_ratio value:GAUGE:0:100
cache_result value:DERIVE:0:U
cache_size value:GAUGE:0:1125899906842623
capacity value:GAUGE:0:U
ceph_bytes value:GAUGE:U:U
ceph_latency value:GAUGE:U:U
ceph_rate value:DERIVE:0:U
changes_since_last_save value:GAUGE:0:U
charge value:GAUGE:0:U
clock_last_meas value:GAUGE:0:U
clock_last_update value:GAUGE:U:U
clock_mode value:GAUGE:0:U
clock_reachability value:GAUGE:0:U
clock_skew_ppm value:GAUGE:-2:2
clock_state value:GAUGE:0:U
clock_stratum value:GAUGE:0:U
compression uncompressed:DERIVE:0:U, compressed:DERIVE:0:U
compression_ratio value:GAUGE:0:2
connections value:DERIVE:0:U
conntrack value:GAUGE:0:4294967295
contextswitch value:DERIVE:0:U
count value:GAUGE:0:U
counter value:COUNTER:U:U
cpu value:DERIVE:0:U
cpufreq value:GAUGE:0:U
current value:GAUGE:U:U
current_connections value:GAUGE:0:U
current_sessions value:GAUGE:0:U
delay value:GAUGE:-1000000:1000000
derive value:DERIVE:0:U
df used:GAUGE:0:1125899906842623, free:GAUGE:0:1125899906842623
df_complex value:GAUGE:0:U
df_inodes value:GAUGE:0:U
dilution_of_precision value:GAUGE:0:U
disk_io_time io_time:DERIVE:0:U, weighted_io_time:DERIVE:0:U
disk_latency read:GAUGE:0:U, write:GAUGE:0:U
disk_merged read:DERIVE:0:U, write:DERIVE:0:U
disk_octets read:DERIVE:0:U, write:DERIVE:0:U
disk_ops read:DERIVE:0:U, write:DERIVE:0:U
disk_ops_complex value:DERIVE:0:U
disk_time read:DERIVE:0:U, write:DERIVE:0:U
dns_answer value:DERIVE:0:U
dns_notify value:DERIVE:0:U
dns_octets queries:DERIVE:0:U, responses:DERIVE:0:U
dns_opcode value:DERIVE:0:U
dns_qtype value:DERIVE:0:U
dns_qtype_cached value:GAUGE:0:4294967295
dns_query value:DERIVE:0:U
dns_question value:DERIVE:0:U
dns_rcode value:DERIVE:0:U
dns_reject value:DERIVE:0:U
dns_request value:DERIVE:0:U
dns_resolver value:DERIVE:0:U
dns_response value:DERIVE:0:U
dns_transfer value:DERIVE:0:U
dns_update value:DERIVE:0:U
dns_zops value:DERIVE:0:U
drbd_resource value:DERIVE:0:U
duration seconds:GAUGE:0:U
email_check value:GAUGE:0:U
email_count value:GAUGE:0:U
email_size value:GAUGE:0:U
energy value:GAUGE:U:U
energy_wh value:GAUGE:U:U
entropy value:GAUGE:0:4294967295
errors value:DERIVE:0:U
evicted_keys value:DERIVE:0:U
expired_keys value:DERIVE:0:U
fanspeed value:GAUGE:0:U
file_handles value:GAUGE:0:U
file_size value:GAUGE:0:U
files value:GAUGE:0:U
filter_result value:DERIVE:0:U
flow value:GAUGE:0:U
fork_rate value:DERIVE:0:U
frequency value:GAUGE:0:U
frequency_error value:GAUGE:-2:2
frequency_offset value:GAUGE:-1000000:1000000
fscache_stat value:DERIVE:0:U
gauge value:GAUGE:U:U
hash_collisions value:DERIVE:0:U
http_request_methods value:DERIVE:0:U
http_requests value:DERIVE:0:U
http_response_codes value:DERIVE:0:U
humidity value:GAUGE:0:100
if_collisions value:DERIVE:0:U
if_dropped rx:DERIVE:0:U, tx:DERIVE:0:U
if_errors rx:DERIVE:0:U, tx:DERIVE:0:U
if_multicast value:DERIVE:0:U
if_octets rx:DERIVE:0:U, tx:DERIVE:0:U
if_packets rx:DERIVE:0:U, tx:DERIVE:0:U
if_rx_dropped value:DERIVE:0:U
if_rx_errors value:DERIVE:0:U
if_rx_octets value:DERIVE:0:U
if_rx_packets value:DERIVE:0:U
if_tx_dropped value:DERIVE:0:U
if_tx_errors value:DERIVE:0:U
if_tx_octets value:DERIVE:0:U
if_tx_packets value:DERIVE:0:U
invocations value:DERIVE:0:U
io_octets rx:DERIVE:0:U, tx:DERIVE:0:U
io_packets rx:DERIVE:0:U, tx:DERIVE:0:U
ipc value:GAUGE:0:U
ipt_bytes value:DERIVE:0:U
ipt_packets value:DERIVE:0:U
irq value:DERIVE:0:U
latency value:GAUGE:0:U
links value:GAUGE:0:U
load shortterm:GAUGE:0:5000, midterm:GAUGE:0:5000, longterm:GAUGE:0:5000
memory_bandwidth value:DERIVE:0:U
md_disks value:GAUGE:0:U
memcached_command value:DERIVE:0:U
memcached_connections value:GAUGE:0:U
memcached_items value:GAUGE:0:U
memcached_octets rx:DERIVE:0:U, tx:DERIVE:0:U
memcached_ops value:DERIVE:0:U
memory value:GAUGE:0:281474976710656
memory_lua value:GAUGE:0:281474976710656
memory_throttle_count value:DERIVE:0:U
multimeter value:GAUGE:U:U
mutex_operations value:DERIVE:0:U
mysql_bpool_bytes value:GAUGE:0:U
mysql_bpool_counters value:DERIVE:0:U
mysql_bpool_pages value:GAUGE:0:U
mysql_commands value:DERIVE:0:U
mysql_handler value:DERIVE:0:U
mysql_innodb_data value:DERIVE:0:U
mysql_innodb_dblwr value:DERIVE:0:U
mysql_innodb_log value:DERIVE:0:U
mysql_innodb_pages value:DERIVE:0:U
mysql_innodb_row_lock value:DERIVE:0:U
mysql_innodb_rows value:DERIVE:0:U
mysql_locks value:DERIVE:0:U
mysql_log_position value:DERIVE:0:U
mysql_octets rx:DERIVE:0:U, tx:DERIVE:0:U
mysql_select value:DERIVE:0:U
mysql_sort value:DERIVE:0:U
mysql_sort_merge_passes value:DERIVE:0:U
mysql_sort_rows value:DERIVE:0:U
mysql_slow_queries value:DERIVE:0:U
nfs_procedure value:DERIVE:0:U
nginx_connections value:GAUGE:0:U
nginx_requests value:DERIVE:0:U
node_octets rx:DERIVE:0:U, tx:DERIVE:0:U
node_rssi value:GAUGE:0:255
node_stat value:DERIVE:0:U
node_tx_rate value:GAUGE:0:127
objects value:GAUGE:0:U
operations value:DERIVE:0:U
operations_per_second value:GAUGE:0:U
packets value:DERIVE:0:U
pending_operations value:GAUGE:0:U
percent value:GAUGE:0:100.1
percent_bytes value:GAUGE:0:100.1
percent_inodes value:GAUGE:0:100.1
pf_counters value:DERIVE:0:U
pf_limits value:DERIVE:0:U
pf_source value:DERIVE:0:U
pf_state value:DERIVE:0:U
pf_states value:GAUGE:0:U
pg_blks value:DERIVE:0:U
pg_db_size value:GAUGE:0:U
pg_n_tup_c value:DERIVE:0:U
pg_n_tup_g value:GAUGE:0:U
pg_numbackends value:GAUGE:0:U
pg_scan value:DERIVE:0:U
pg_xact value:DERIVE:0:U
ping value:GAUGE:0:65535
ping_droprate value:GAUGE:0:100
ping_stddev value:GAUGE:0:65535
players value:GAUGE:0:1000000
power value:GAUGE:U:U
pressure value:GAUGE:0:U
protocol_counter value:DERIVE:0:U
ps_code value:GAUGE:0:9223372036854775807
ps_count processes:GAUGE:0:1000000, threads:GAUGE:0:1000000
ps_cputime user:DERIVE:0:U, syst:DERIVE:0:U
ps_data value:GAUGE:0:9223372036854775807
ps_disk_octets read:DERIVE:0:U, write:DERIVE:0:U
ps_disk_ops read:DERIVE:0:U, write:DERIVE:0:U
ps_pagefaults minflt:DERIVE:0:U, majflt:DERIVE:0:U
ps_rss value:GAUGE:0:9223372036854775807
ps_stacksize value:GAUGE:0:9223372036854775807
ps_state value:GAUGE:0:65535
ps_vm value:GAUGE:0:9223372036854775807
pubsub value:GAUGE:0:U
queue_length value:GAUGE:0:U
records value:GAUGE:0:U
requests value:GAUGE:0:U
response_code value:GAUGE:0:U
response_time value:GAUGE:0:U
root_delay value:GAUGE:U:U
root_dispersion value:GAUGE:U:U
route_etx value:GAUGE:0:U
route_metric value:GAUGE:0:U
routes value:GAUGE:0:U
satellites value:GAUGE:0:U
segments value:GAUGE:0:65535
serial_octets rx:DERIVE:0:U, tx:DERIVE:0:U
signal_noise value:GAUGE:U:0
signal_power value:GAUGE:U:0
signal_quality value:GAUGE:0:U
smart_attribute current:GAUGE:0:255, worst:GAUGE:0:255, threshold:GAUGE:0:255, pretty:GAUGE:0:U
smart_badsectors value:GAUGE:0:U
smart_powercycles value:GAUGE:0:U
smart_poweron value:GAUGE:0:U
smart_temperature value:GAUGE:-300:300
snr value:GAUGE:0:U
spam_check value:GAUGE:0:U
spam_score value:GAUGE:U:U
spl value:GAUGE:U:U
swap value:GAUGE:0:1099511627776
swap_io value:DERIVE:0:U
tcp_connections value:GAUGE:0:4294967295
temperature value:GAUGE:U:U
threads value:GAUGE:0:U
time_dispersion value:GAUGE:-1000000:1000000
time_offset value:GAUGE:-1000000:1000000
time_offset_ntp value:GAUGE:-1000000:1000000
time_offset_rms value:GAUGE:-1000000:1000000
time_ref value:GAUGE:0:U
timeleft value:GAUGE:0:U
total_bytes value:DERIVE:0:U
total_connections value:DERIVE:0:U
total_objects value:DERIVE:0:U
total_operations value:DERIVE:0:U
total_requests value:DERIVE:0:U
total_sessions value:DERIVE:0:U
total_threads value:DERIVE:0:U
total_time_in_ms value:DERIVE:0:U
total_values value:DERIVE:0:U
uptime value:GAUGE:0:4294967295
users value:GAUGE:0:65535
vcl value:GAUGE:0:65535
vcpu value:GAUGE:0:U
virt_cpu_total value:DERIVE:0:U
virt_vcpu value:DERIVE:0:U
vmpage_action value:DERIVE:0:U
vmpage_faults minflt:DERIVE:0:U, majflt:DERIVE:0:U
vmpage_io in:DERIVE:0:U, out:DERIVE:0:U
vmpage_number value:GAUGE:0:4294967295
volatile_changes value:GAUGE:0:U
voltage value:GAUGE:U:U
voltage_threshold value:GAUGE:U:U, threshold:GAUGE:U:U
vs_memory value:GAUGE:0:9223372036854775807
vs_processes value:GAUGE:0:65535
vs_threads value:GAUGE:0:65535

#
# Legacy types
# (required for the v5 upgrade target)
#
arc_counts demand_data:COUNTER:0:U, demand_metadata:COUNTER:0:U, prefetch_data:COUNTER:0:U, prefetch_metadata:COUNTER:0:U
arc_l2_bytes read:COUNTER:0:U, write:COUNTER:0:U
arc_l2_size value:GAUGE:0:U
arc_ratio value:GAUGE:0:U
arc_size current:GAUGE:0:U, target:GAUGE:0:U, minlimit:GAUGE:0:U, maxlimit:GAUGE:0:U
mysql_qcache hits:COUNTER:0:U, inserts:COUNTER:0:U, not_cached:COUNTER:0:U, lowmem_prunes:COUNTER:0:U, queries_in_cache:GAUGE:0:U
mysql_threads running:GAUGE:0:U, connected:GAUGE:0:U, cached:GAUGE:0:U, created:COUNTER:0:U

java log4j

log4j 简介

1
Apache的开源项目log4j是一个功能强大的日志组件,提供方便的日志记录
  1. Log4j基本使用方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    Log4j由三个重要的组件构成:日志信息的优先级,日志信息的输出目的地,日志信息的输出格式
    1.日志信息的优先级从高到低有ERROR,WARN,INFO,DEBUG,分别用来指定这条日志信息的重要程度
    2.日志信息的输出目的地指定了日志将打印到控制台还是文件中
    3.输出格式则控制了日志信息的显示内容

    1.定义配置文件
    Log4j支持两种配置文件格式:一种是XML格式的文件;一种是Java特性文件(键=值)

    Java特性文件做为配置文件的方法:
    //配置根Logger
    log4j.rootLogger = [ level ] , appenderName, appenderName, …

    level是日志记录的优先级,分为OFF,FATAL,ERROR,WARN,INFO,DEBUG,ALL或者您定义的级别.Log4j建议只使用四个级别,优先级从高到低分别是ERROR,WARN,INFO,DEBUG.通过在这里定义的级别,您可以控制到应用程序中相应级别的日志信息的开关.比如在这里定义了INFO级别,则应用程序中所有DEBUG级别的日志信息将不被打印出来.
    appenderName就是指B日志信息输出到哪个地方.您可以同时指定多个输出目的地.

    //配置日志信息输出目的地Appender
    log4j.appender.appenderName = fully.qualified.name.of.appender.class
    log4j.appender.appenderName.option1 = value1
    ... ...
    log4j.appender.appenderName.option = valueN

    Log4j提供的appender有以下几种:
    1.org.apache.log4j.ConsoleAppender(控制台)
    2.org.apache.log4j.FileAppender(文件)
    3.org.apache.log4j.DailyRollingFileAppender(每天产生一个日志文件)
    4.org.apache.log4j.RollingFileAppender(文件大小到达指定尺寸的时候产生一个新的文件)
    5.org.apache.log4j.WriterAppender(将日志信息以流格式发送到任意指定的地方)

    //配置日志信息的格式(布局)
    log4j.appender.appenderName.layout = fully.qualified.name.of.layout.class
    log4j.appender.appenderName.layout.option1 = value1

    log4j.appender.appenderName.layout.option = valueN

    Log4j提供的layout有以e几种:
    1.org.apache.log4j.HTMLLayout(以HTML表格形式布局)
    2.org.apache.log4j.PatternLayout(可以灵活地指定布局模式)
    3.org.apache.log4j.SimpleLayout(包含日志信息的级别和信息字符串)
    4.org.apache.log4j.TTCCLayout(包含日志产生的时间、线程、类别等等信息)


    Log4J采用类似C语言中的printf函数的打印格式格式化日志信息,打印参数如下:
    %m 输出代码中指定的消息
    %p 输出优先级,即DEBUG,INFO,WARN,ERROR,FATAL
    %r 输出自应用启动到输出该log信息耗费的毫秒数
    %c 输出所属的类目,通常就是所在类的全名
    %t 输出产生该日志事件的线程名
    %n 输出一个回车换行符,Windows平台为"rn",Unix平台为"n"
    %d 输出日志时间点的日期或时间,默认格式为ISO8601,也可以在其后指定格式,比如: %d{yyy MMM dd HH:mm:ss,SSS},输出类似:2002年10月18日 22:10:28,921
    %l 输出日志事件的发生位置,包括类目名,发生的线程,以及在代码中的行数.举例:Testlog4.main(TestLog4.java:10)
  2. 在代码中使用log4j

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    1.得到记录器,这个记录器将负责控制日志信息
    public static Logger getLogger( String name)

    Name一般取本类的名字,比如:
    static Logger logger = Logger.getLogger(ServerWithLog4j.class.getName())

    2.读取配置文件
    BasicConfigurator.configure () 自动快速地使用缺省Log4j环境
    PropertyConfigurator.configure ( String configFilename) 读取使用Java的特性文件编写的配置文件
    DOMConfigurator.configure ( String filename ) 读取XML形式的配置文件

    3.插入记录信息(格式化日志信息)
    Logger.debug ( Object message ) ;
    Logger.info ( Object message ) ;
    Logger.warn ( Object message ) ;
    Logger.error ( Object message ) ;

    日志级别:
    每个Logger都被了一个日志级别(log level)用来控制日志信息的输出.
    日志级别从高到低分为:
    A:off 最高等级,用于关闭所有日志记录
    B:fatal 指出每个严重的错误事件将会导致应用程序的退出
    C:error 指出虽然发生错误事件,但仍然不影响系统的继续运行
    D:warm 表明会出现潜在的错误情形
    E:info 一般和在粗粒度级别上,强调应用程序的运行全程
    F:debug 一般用于细粒度级别上,对调试应用程序非常有帮助
    G:all 最低等级,用于打开所有日志记录

    上面这些级别是定义在org.apache.log4j.Level类中.
    Log4j只建议使用4个级别,优先级从高到低分别是error,warn,info和debug.
    通过使用日志级别,可以控制应用程序中相应级别日志信息的输出.例如:如果使用b了info级别,则应用程序中所有低于info级别的日志信息(如debug)将不会被打印出来

salt install

saltstack 安装

  1. 基础软件安装

    1
    2
    3
    4
    5
    6
    [root@saltstack ~]# yum -y install epel-release
    [root@saltstack ~]# yum -y install gcc gcc-c++ make cmake bison libtool autoconf automake zip unzip bzip2 zlib zlib-devel openssl openssl-devel openssl-static pcre pcre-devel bison-devel ncurses-devel tcl tcl-devel perl-Digest-SHA1 GeoIP GeoIP-devel gperftools gperftools-devel libatomic_ops-devel gtest gtest-devel glibc-devel unixODBC-devel fop libperl libpython readline readline-devel python2-pip readline readline-devel readline-static sqlite-devel bzip2-devel bzip2-libs openldap-devel
    [root@saltstack ~]# yum -y install git lftp ntpdate vim wget telnet dstat tree lrzsz net-tools nmap-ncat nmap sysstat

    [root@saltstack ~]# rpm -e --nodeps python2-pycryptodomex
    [root@saltstack ~]# yum -y install python-crypto
  2. salt yum repo

    1
    2
    3
    4
    5
    [root@saltstack ~]# yum -y install https://repo.saltstack.com/yum/redhat/salt-repo-latest-2.el7.noarch.rpm
    [root@saltstack ~]# yum -y install https://repo.saltstack.com/yum/redhat/salt-repo-latest-2.el6.noarch.rpm

    [root@saltstack ~]# yum clean expire-cache
    [root@saltstack ~]# yum makecache
  3. salt master install

    1
    2
    3
    [root@saltstack ~]# yum -y install salt-master
    [root@saltstack ~]# yum -y install salt-cloud salt-api salt-repo salt-ssh salt-syndic
    [root@saltstack ~]# pip install raet
  4. salt minion install

    1
    [root@saltstack ~]# yum -y install salt-minion
  5. salt-master 配置文件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    [root@localhost ~]# ls -1d /etc/salt/master*
    /etc/salt/master
    /etc/salt/master.d

    [root@saltstack salt]# vim /etc/salt/master
    default_include: master.d/*.conf
    interface: 0.0.0.0
    publish_port: 4505
    user: root
    ret_port: 4506
  6. salt-minion 配置文件

    1
    2
    3
    4
    5
    [root@localhost ~]# ls -1d /etc/salt/minion*
    /etc/salt/minion
    /etc/salt/minion.d

    [root@localhost ~]# vim /etc/salt/minion
  7. salt-master 启动

    1
    2
    [root@saltstack ~]# systemctl start salt-master
    [root@saltstack ~]# systemctl enable salt-master
  8. salt-minion 启动

    1
    2
    [root@localhost ~]# systemctl start salt-minion
    [root@localhost ~]# systemctl enable salt-minion
  9. 错误信息

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    [root@localhost ~]# yum -y install salt-minion
    ... ...
    Retrieving key from file:///etc/pki/rpm-gpg/RPM-GPG-KEY-EPEL-7
    Importing GPG key 0x352C64E5:
    Userid : "Fedora EPEL (7) <epel@fedoraproject.org>"
    Fingerprint: 91e9 7d7c 4a5e 96f1 7f3e 888f 6a2f aea2 352c 64e5
    Package : epel-release-7-9.noarch (@wisdom_CentOS_7_extras)
    From : /etc/pki/rpm-gpg/RPM-GPG-KEY-EPEL-7
    Retrieving key from file:///etc/pki/rpm-gpg/saltstack-signing-key

    The GPG keys listed for the "SaltStack Latest Release Channel for RHEL/Centos 7" repository are already installed but they are not correct for this package.
    Check that the correct key URLs are configured for this repository.

    Failing package is: python-jinja2-2.7.2-2.el7.noarch
    GPG Keys are configured as: file:///etc/pki/rpm-gpg/saltstack-signing-key


    解决方法:
    [root@localhost ~]# wget https://mirrors.aliyun.com/centos/RPM-GPG-KEY-CentOS-7
    [root@localhost ~]# rpm --import RPM-GPG-KEY-CentOS-7
  10. salt minion config

    1
    2
    3
    echo 'default_include: minion.d/*.conf'|tee -a /etc/salt/minion
    echo 'master: 10.0.1.90'|tee /etc/salt/minion.d/minion.conf
    echo 'id: 10.0.3.39'|tee -a /etc/salt/minion.d/minion.conf

systemctl start salt-minion

systemctl enable salt-master.service
systemctl start salt-master.service

systemctl enable salt-minion.service
systemctl start salt-minion.service

slat-key -L
slat-key -A
salt-key -a key

salt “*” test.ping

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
---

---
### salt install(多master + syndic架构)
1. 基础环境初始化
```text
192.168.13.217 saltserver1
192.168.13.218 saltserver2
192.168.13.212 saltsyndic

//修改主机名
hostname saltserver1 && echo saltserver1 |tee /etc/hostname
hostname saltserver2 && echo saltserver1 |tee /etc/hostname
hostname saltsyndic && echo saltsyndic |tee /etc/hostname

//修改hosts文件
echo '192.168.13.217 saltserver1' |tee -a /etc/hosts
echo '192.168.13.218 saltserver2' |tee -a /etc/hosts
echo '192.168.13.212 saltsyndic' |tee -a /etc/hosts

//格式化磁盘
mkfs.xfs /dev/vdb
echo '/dev/vdb /mnt xfs defaults 0 0' |tee -a /etc/fstab
mount -a

//修改最大文件描述符
echo '* - nproc 65535' | tee -a /etc/security/limits.conf
echo '* - nofile 65535' | tee -a /etc/security/limits.conf
ls /etc/security/limits.d/|xargs rm -f

//修改yum源
mkdir /etc/yum.repos.d/backup && mv /etc/yum.repos.d/{*,backup}
wget -O /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-7.repo
rpm --import http://yum.jwops.cn/wisdom/centos/RPM-GPG-KEY-CentOS-7
yum -y install epel-release
yum clean all && yum makecache

yum -y install gcc gcc-c++ make cmake bison libtool autoconf automake zip unzip bzip2 zlib zlib-devel openssl openssl-devel openssl-static pcre pcre-devel bison-devel ncurses-devel tcl tcl-devel perl-Digest-SHA1 GeoIP GeoIP-devel gperftools gperftools-devel libatomic_ops-devel gtest gtest-devel glibc-devel unixODBC-devel fop libperl libpython readline readline-devel python-devel python-pip python-crypto readline readline-devel readline-static sqlite-devel bzip2-devel bzip2-libs openldap-devel gdk-pixbuf2 gdk-pixbuf2-devel libffi libffi-devel libcurl libcurl-devel http-parser http-parser-devel libssh2 libssh2-devel git lftp ntp ntpdate vim wget telnet dstat tree lrzsz net-tools nmap-ncat nmap sysstat

//关闭selinux
setenforce 0
sed -i s/'SELINUX=enforcing'/'SELINUX=disabled'/g /etc/selinux/config

//关闭防火墙
systemctl stop firewalld && systemctl disable firewalld

//时间设置
[ -f /etc/localtime ] && cp -f /usr/share/zoneinfo/Asia/Shanghai /etc/localtime
[ -f /etc/sysconfig/clock ] && echo 'ZONE="Asia/Shanghai"' | tee /etc/sysconfig/clock
[ -f /etc/timezone ] && echo 'Asia/Shanghai' | tee /etc/timezone
[ -f /etc/sysconfig/ntpd ] && echo 'SYNC_HWCLOCK=yes' | tee -a /etc/sysconfig/ntpd

ntpdate cn.pool.ntp.org

cp -f /etc/{ntp.conf,ntp.conf.bak}
cat > /etc/ntp.conf <<EOF
driftfile /var/lib/ntp/drift
restrict default nomodify notrap nopeer noquery
restrict 127.0.0.1
restrict ::1
server cn.pool.ntp.org prefer
server 0.centos.pool.ntp.org iburst
server 1.centos.pool.ntp.org iburst
server 2.centos.pool.ntp.org iburst
server 3.centos.pool.ntp.org iburst
includefile /etc/ntp/crypto/pw
keys /etc/ntp/keys
disable monitor
EOF

cp -f /etc/ntp/{step-tickers,step-tickers.bak}
cat > /etc/ntp/step-tickers <<EOF
cn.pool.ntp.org
0.centos.pool.ntp.org
1.centos.pool.ntp.org
2.centos.pool.ntp.org
3.centos.pool.ntp.org
EOF

systemctl start ntpd && systemctl enable ntpd

//创建基础目录
mkdir -p /mnt/{app,data,log,web,ops/{app,data,cron}}

//挂载一块共享存储(nfs)
mkdir /mnt/data/salt
yum -y install nfs-utils
echo '192.168.13.201:/mnt/data/nfs /mnt/data/salt nfs nfsvers=3 0 0' | tee -a /etc/fstab
mount -a

//pip更新源和升级
mkdir ~/.pip
cat > ~/.pip/pip.conf <<EOF
[global]
trusted-host=mirrors.aliyun.com
index-url=http://mirrors.aliyun.com/pypi/simple/
[list]
format=columns
EOF
pip install --upgrade pip
pip install urllib urllib3

  1. salt-master install

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    //salt源安装
    CentOS 7:
    yum -y install https://repo.saltstack.com/yum/redhat/salt-repo-latest-2.el7.noarch.rpm
    CentoS 6:
    yum -y install https://repo.saltstack.com/yum/redhat/salt-repo-latest-2.el6.noarch.rpm

    yum clean expire-cache
    yum makecache

    //安装pygit2
    wget https://codeload.github.com/libgit2/libgit2/tar.gz/v0.26.0
    tar xzf v0.26.0
    cd libgit2-0.26.0/
    cmake .
    make
    make install
    echo '/usr/local/lib' | tee /etc/ld.so.conf.d/libgit2.conf
    ldconfig
    pip install pygit2

    //安装raet
    pip install raet

    //salt-master 和 salt-syndic 安装
    yum -y install salt-master salt salt-repo
    yum -y install salt-api salt-ssh salt-syndic salt-cloud

    [root@saltserver1 ~]# salt --versions-report
    Salt Version:
    Salt: 2017.7.0

    Dependency Versions:
    cffi: 1.6.0
    cherrypy: unknown
    dateutil: Not Installed
    docker-py: Not Installed
    gitdb: 0.6.4
    gitpython: Not Installed
    ioflo: 1.6.9
    Jinja2: 2.7.2
    libgit2: 0.26.0
    libnacl: 1.5.2
    M2Crypto: Not Installed
    Mako: Not Installed
    msgpack-pure: Not Installed
    msgpack-python: 0.4.8
    mysql-python: Not Installed
    pycparser: 2.14
    pycrypto: 2.6.1
    pycryptodome: Not Installed
    pygit2: 0.26.0
    Python: 2.7.5 (default, Nov 6 2016, 00:28:07)
    python-gnupg: Not Installed
    PyYAML: 3.11
    PyZMQ: 15.3.0
    RAET: 0.6.8
    smmap: 0.9.0
    timelib: Not Installed
    Tornado: 4.2.1
    ZMQ: 4.1.4

    System Versions:
    dist: centos 7.0.1406 Core
    locale: UTF-8
    machine: x86_64
    release: 3.10.0-123.el7.x86_64
    system: Linux
    version: CentOS Linux 7.0.1406 Core

    //saltserver1配置启动
    [root@saltserver1 ~]# mkdir -p /mnt/data/salt/conf
    [root@saltserver1 ~]# mkdir -p /mnt/data/salt/pki/master
    [root@saltserver1 ~]# mkdir -p /mnt/data/salt/file/{base,dev,prod}
    [root@saltserver1 ~]# mkdir -p /mnt/data/salt/pillar/{base,dev,prod}
    [root@saltserver1 ~]# mkdir -p /mnt/data/salt-master
    [root@saltserver1 ~]# mkdir -p /mnt/log/salt

    [root@saltserver1 ~]# cp /etc/salt/master /mnt/data/salt/conf/master.conf
    [root@saltserver1 ~]# ln -sf /mnt/data/salt/conf/master.conf /etc/salt/master.d/master.conf

    [root@saltserver1 ~]# vim /etc/salt/master.d/master.conf
    conf_file: /etc/salt/master
    pki_dir: /mnt/data/salt/pki/master
    file_roots:
    base:
    - /mnt/data/salt/file/base
    dev:
    - /mnt/data/salt/file/dev
    prod:
    - /mnt/data/salt/file/prod
    pillar_roots:
    base:
    - /mnt/data/salt/pillar/base
    dev:
    - /mnt/data/salt/pillar/dev
    prod:
    - /mnt/data/salt/pillar/prod
    [root@saltserver1 ~]# systemctl start salt-master
    [root@saltserver1 ~]# systemctl enable salt-master

    //saltserver2配置启动(将saltserver1配置文件同步到saltserver2上)
    [root@saltserver2 ~]# systemctl start salt-master
    [root@saltserver2 ~]# systemctl enable salt-master
  2. saltsyndic安装

    1
    2