salt jinja2 grain pillar module

salt通过Jinja2模板以及grain和pillar扩展主机状态

  1. 简介

    1
    我们学习了简单状态文件的编写,实际情况我们会遇到更复杂的情况,比如:对不同操作系统安装软件,根据主机CPU梳理,内存动态生成软件配置文件等,这一切都需要Jinja2以及grain和pillar的辅助.
  2. Jinja2

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    Jinja2是一个强大的python模板引擎,可以使用代码动态生成创建文件的内容

    //Jinja2 变量
    Jinja2包含变量和表达式: 变量用"{{}}"包围,表达式用"{%%}"包围
    * 字符串类型: {% set var = 'good' %} {{var}}
    * 列表类型: {% list = [1,2,3] %} {{list[0]}}
    * 字典类型: {% dict = {'a':1,'b':2} %} {{dict['a']}}

    例如:
    [root@saltserver ~]# vim /srv/salt/var.sls
    {% set var = 'hello world' %}
    test_var:
    cmd.run:
    - name: echo "var is {{var}}"
    [root@saltserver ~]# salt '*' state.sls var
    192.168.13.187:
    ----------
    ID: test_var
    Function: cmd.run
    Name: echo "var is hello world"
    Result: True
    Comment: Command "echo "var is hello world"" run
    Started: 11:07:33.190097
    Duration: 19.801 ms
    Changes:
    ----------
    pid:
    29983
    retcode:
    0
    stderr:
    stdout:
    var is hello world

    Summary for 192.168.13.187
    ------------
    Succeeded: 1 (changed=1)
    Failed: 0
    ------------
    Total states run: 1
    Total run time: 19.801 ms

    //Jinja2 流程控制
    * for
    例一:
    {% for user in users %}
    {{user}}
    {% endfor %}
    例二:
    {% for key,value in my_dict.iteritems() %}
    {{key}}
    {{value}}
    {% endfor %}

    注意: 模板中循环不能有break和continue.但你可以在迭代中过滤序列来跳过项目
    {% for user in users if not user.hidden %}
    {{user.username}}
    {% endfor %}

    * if
    例一:
    {% if users %}
    {% for user in users %}
    {{user.username}}
    {% endfor %}
    {% endif %}

    例二:
    {% if kenny.sick %}
    kenny is sick.
    {% elif kenny.dead %}
    You killed Kennny! You bastard!!!
    {% else %}
    Kenny looks okay --- so far
    {% endif %}
  3. grain 和 pillar

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    grain和pillar本质上都是key,value型数据库.
    grain存储在minion上的数据,minion启动后就进行grains计算.grain是一种静态数据,包括操作系统类型,版本,CPU数量,内存大小等.这些数据不经常变,即时有所变化重启minion也会重新计算生成
    pillar数据存储在master上,指定的minion只能看到自己的pillar数据,其他的minion看不到任何pillar数据,这一点与状态文件正好相反.所有通过认证的minion都可以获取状态文件,但是每个minion却都有自己的一套pillar数据,而且每台minion的pillar都进行了加密,所以很适用于敏感数据


    //grain相关命令
    * 列出minion上的grains项
    [root@saltserver ~]# salt '*' grains.ls
    * 查看minion上具体某个grain项
    [root@saltserver ~]# salt '*' grains.item os
    * 列出所有grain详细信息
    [root@saltserver ~]# salt '*' grains.items
    * 设置grain数据
    [root@saltserver ~]# salt '192.168.13.187' grains.setval my_grain bar
    * 设置grain多个值
    [root@saltserver ~]# salt '192.168.13.187' grains.setvals "{'k1':'v1','k2':'v2'}"
    * 设置列表
    [root@saltserver ~]# salt '192.168.13.187' grains.setval my_dict '[1,2,3]'
    * 查看item
    [root@saltserver ~]# salt '192.168.13.187' grains.item my_grain
    [root@saltserver ~]# salt '192.168.13.187' grains.item k1 k2
    [root@saltserver ~]# salt '192.168.13.187' grains.item my_dict
    * 在saltminion上查看
    [root@saltminion ~]# cat /etc/salt/grains
    k1: v1
    k2: v2
    my_dict:
    - 1
    - 2
    - 3
    my_grain: bar

    * grains_module方式设置
    方式一: 在salt-master端设置,下发到minion
    创建模块目录:
    [root@saltserver ~]# mkdir -p /srv/salt/_grains
    编写模块:
    [root@saltserver ~]# vim /srv/salt/_grains/my_grain.py
    import time
    def now():
    grains = {}
    grains['now'] = time.time()
    return grains
    同步模块到minion:
    [root@saltserver ~]# salt '*' saltutil.sync_all
    192.168.13.187:
    ----------
    beacons:
    clouds:
    engines:
    grains:
    - grains.my_grain
    log_handlers:
    modules:
    output:
    proxymodules:
    renderers:
    returners:
    sdb:
    states:
    utils:
    重新加载一次模块:
    [root@saltserver ~]# salt '*' sys.reload_modules
    192.168.13.187:
    True
    查看设置的grains:
    [root@saltserver ~]# salt '*' grains.item now
    192.168.13.187:
    ----------
    now:
    1502076872.27
    删除自定义granins:
    [root@saltserver ~]# salt '*' grains.delval my_grain
    192.168.13.187:
    None
    方式二: 在minion端设置
    在minion上修改grain.conf配置文件:
    [root@saltminion ~]# vim /etc/salt/minion.d/grain.conf
    grains:
    new_grain: bar
    new_grain_dict:
    - one
    - two
    - three
    重启salt-minion服务:
    [root@saltminion ~]# pkill salt-minion
    [root@saltminion ~]# salt-minion -c /etc/salt -d
    在master端进行验证:
    [root@saltserver ~]# salt '*' grains.item new_grain_dict
    192.168.13.187:
    ----------
    new_grain_dict:
    - one
    - two
    - three


    //pillar相关命令
    * 列出minion上所有pillar详细信息
    [root@saltserver ~]# salt '*' pillar.items
    * 查询minion上某一个具体grain值
    [root@saltserver ~]# salt '*' pillar.item foo

    * 设置pillar数据
    建立pillar目录:
    [root@saltserver ~]# mkdir -p /srv/pillar
    创建pillar文件(sls文件):
    [root@saltserver ~]# vim /srv/pillar/minion_one_key.sls
    private_key: minion_one_key
    建立入口文件:
    [root@saltserver ~]# vim /srv/pillar/top.sls
    base:
    '192.168.13.187':
    - minion_one_key
    刷新pillar数据:
    [root@saltserver ~]# salt '*' saltutil.refresh_pillar
    192.168.13.187:
    True
    查看下发的pillar数据:
    [root@saltserver ~]# salt '*' pillar.items
    192.168.13.187:
    ----------
    private_key:
    minion_one_key
  4. 通过Jinja2配合grain和pillar扩展SLS配置文件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    //扩展apache.sls配置文件
    * apache不同操作系统安装
    [root@saltserver ~]# cat /srv/salt/apache.sls
    install_apache:
    pkg.installed:
    {% if grains['os_family'] == 'Debian' %}
    - name: apache2
    {% elif grains['os_family'] == 'RedHat' %}
    - name: httpd
    {% endif %}

    * vim多系统安装
    [root@saltserver ~]# vim /srv/salt/vim.sls
    vim:
    pkg:
    - installed
    {% if grains['os_family'] == 'RedHat' %}
    - name: vim-enhanced
    {% elif grains['os'] == 'Debian' %}
    - name: vim-nox
    {% endif %}

    {% if grains['os'] == 'Arch' %}
    /etc/vimrc:
    file:
    - managed
    - source: salt://vim/vimrc
    - user: root
    - group: root
    - mode: 644
    - template: jinja
    - makedirs: True
    - require:
    - pkg: vim
    {% endif %}

    * epel多系统安装
    [root@saltserver ~]# vim /srv/salt/epel.sls
    epel:
    cmd:
    - run
    {% if grains['osrelease'].startswitch('5') %}
    - name: rpm -Uvh http://xx/5/xxx.rpm
    {% elif grains['osrelease'].startswitch('6') %}
    - name: rpm -Uvh http://xx/6/xxx.rpm
    {% endif %}
    - unless: test -e /etc/yum.repos.d/epel.repo

    * iptables设置
    [root@saltserver ~]# vim /srv/salt/iptables.sls
    iptables:
    pkg:
    - installed
    service:
    - running
    - watch:
    - pkg: iptables
    - file: iptables
    file:
    - managed
    - source: salt://iptables/iptables
    {% if grains['os'] == 'CentOS' or grains['os'] == 'Fedora' %}
    - name: /etc/sysconfig/iptables
    {% elif grains['os'] == 'Arch' %}
    - name: /etc/conf.d/iptables
    {% endif %}
  5. 通过Jinja2配合grain和pillar 动态下发配置文件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    file模块的一个状态函数managed,这个模块可以从master下发配置文件到匹配的minion上,这种下发方式使所有minion得到同样的配置文件.但现实情况是不同的minion有不同的CPU核心数量,有不同大小内存值.很多软件的配置文件需要根据主机配置的不同进行相应的调整.Jinja2配合grain和pillar可以很好的解决此类问题

    * 一个简单的模板文件下发
    编写模板文件:
    [root@saltserver ~]# vim /srv/salt/templates.sls
    template_test:
    file.managed:
    - source: salt://test.j2
    - name: /tmp/test.conf
    - user: root
    - group: root
    - mode: 644
    - template: jinja
    [root@saltserver ~]# vim /srv/salt/test.j2
    cpu_num = {{ grains['num_cpus'] }}
    mem_total = {{ grains['mem_total'] }}
    hostname = {{ grains['host'] }}
    user = {{ pillar['private_key'] }}
    测试模板文件下发:
    [root@saltserver ~]# salt '*' state.sls templates
    192.168.13.187:
    ----------
    ID: template_test
    Function: file.managed
    Name: /tmp/test.conf
    Result: True
    Comment: File /tmp/test.conf updated
    Started: 15:04:28.737899
    Duration: 44.619 ms
    Changes:
    ----------
    diff:
    New file
    mode:
    0644

    Summary for 192.168.13.187
    ------------
    Succeeded: 1 (changed=1)
    Failed: 0
    ------------
    Total states run: 1
    Total run time: 44.619 ms
    在minion上查看:
    [root@saltminion ~]# cat /tmp/test.conf
    cpu_num = 4
    mem_total = 3832
    hostname = saltminion
    user = minion_one_key

    综上,我们看到配置文件内容的变量已经替换成了对应的值,在这个基础上加上Jinja2的逻辑控制功能:
    [root@saltserver ~]# vim /srv/salt/test.j2
    {% if grains['num_cpus'] >= 8 %}
    cpu_num = {{ grains['num_cpus'] }}
    {% endif %}

    {% if grains['mem_total'] <= 512 %}
    mem_total <= 512
    {% elif grains['mem_total'] >= 1024 %}
    mem_total >= 1024
    {% endif %}

    hostname = {{ grains['host'] }}

    # 下发配置文件
    [root@saltserver ~]# salt '*' state.sls templates
    192.168.13.187:
    ----------
    ID: template_test
    Function: file.managed
    Name: /tmp/test.conf
    Result: True
    Comment: File /tmp/test.conf updated
    Started: 15:10:37.611367
    Duration: 48.785 ms
    Changes:
    ----------
    diff:
    ---
    +++
    @@ -1,4 +1,7 @@
    -cpu_num = 4
    -mem_total = 3832
    +
    +
    +
    +mem_total >= 1024
    +
    +
    hostname = saltminion
    -user = minion_one_key

    Summary for 192.168.13.187
    ------------
    Succeeded: 1 (changed=1)
    Failed: 0
    ------------
    Total states run: 1
    Total run time: 48.785 ms

    在minion上查看:
    [root@saltminion ~]# cat /tmp/test.conf
    mem_total >= 1024
    hostname = saltminion

salt state module

salt通过state模块定义主机状态

  1. salt状态的概念

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    远程执行模块的执行是过程式的,类似于shell脚本或者python脚本,每次执行都会触发一次相同的功能

    在大量的minion上运行远程命令非常重要,但对于minion的环境控制,使用状态进行管理更为合适.
    状态是对minion的一种描述和定义,管理人员可以不关心具体部署任务是如何完成的,只需要描述minion需要达到什么状态,底层由salt的状态模块来完成功能
    例如: 在minion上部署apache
    [root@saltserver ~]# salt '*' pkg.install "httpd"
    192.168.13.187:
    ----------
    httpd:
    ----------
    new:
    2.2.15-60.el6.centos.4
    old:
    pkg.install 类似于 直接在系统上执行 "yum install httpd"
  2. salt状态的编写

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    * 创建state存放目录
    [root@saltserver ~]# mkdir -p /srv/salt/

    * 创建apache.sls文件(state文件)
    [root@saltserver ~]# vim /srv/salt/apache.sls
    install_httpd:
    pkg.installed:
    - name: httpd

    * 执行命令
    [root@saltserver ~]# salt '*' state.sls apache
    192.168.13.187:
    ----------
    ID: install_httpd
    Function: pkg.installed
    Name: httpd
    Result: True
    Comment: The following packages were installed/updated: httpd
    Started: 10:01:50.639510
    Duration: 28720.413 ms
    Changes:
    ----------
    httpd:
    ----------
    new:
    2.2.15-60.el6.centos.4
    old:

    Summary for 192.168.13.187
    ------------
    Succeeded: 1 (changed=1)
    Failed: 0
    ------------
    Total states run: 1
    Total run time: 28.720 s

    综上,state模块部署apache时,我们用了一个描述性配置文件,命令行调用了state模块的SLS函数,从返回结果上看是成功部署了httpd
    再次执行,查看状态:
    [root@saltserver ~]# salt '*' state.sls apache
    192.168.13.187:
    ----------
    ID: install_httpd
    Function: pkg.installed
    Name: httpd
    Result: True
    Comment: Package httpd is already installed
    Started: 10:03:52.341712
    Duration: 805.446 ms
    Changes:

    Summary for 192.168.13.187
    ------------
    Succeeded: 1
    Failed: 0
    ------------
    Total states run: 1
    Total run time: 805.446 ms

    综上,执行模块和状态模块之间的主要区别: 执行模块是过程式的,而状态模块是描述性的
    当你连续几次调用同一个执行模块时,实际上将执行相同的逻辑和指令;状态模块恰恰相反,状态模块设计为描述性的,它们只是执行必要的工作,在目标minion上创建根据描述文件指定的状态

    当我们通过pkg.install命令时,实际上只运行了yum install命令,而state模块时会首先判断httpd软件是否安装,如果没安装就进行安装,如果安装了,就什么都不做
  3. 状态配置文件各要素

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    例如:
    [root@saltserver ~]# cat /srv/salt/apache.sls
    install_httpd:
    pkg.installed:
    - name: httpd

    SLS配置文件使用YAML语言描述.salt默认的SLS文件的renderer是YAML renderer. YAML renderer的工作是将YAML数据格式的结构编译成python数据结构给salt使用.
    只需要三个简单规则即可使用YAML语法描述SLS文件:
    规则一: 缩进
    YAML使用一个固定的缩进风格来表示数据层次结构关系.缩进由两个空格组成,禁止使用Tab键
    规则二: 冒号
    字典的keys在YAML中表现形式是一个以冒号结尾的字符串;values的表现形式是冒号下面的每一行用一个空格隔开
    例如: my_key: my_value 对应python {'my_key': 'my_value'}
    字典可以被嵌套:
    例如:
    first_dict_key:
    second_dict_key: second_dict_value
    对应于python:
    {
    'first_dict_key': {
    'second_dict_key': 'second_dict_value'
    }
    }
    规则三: 短横杠
    用一个短横杠加一个空格来表示列表项,多个列表项使用同样的缩进级别作为同一列表的一部分
    例如:
    my_dictionary:
    - list_value_1
    - list_value_2
    - list_value_3
    对应python为:
    {
    'my_dictionary':[
    'list_value_1',
    'list_value_2',
    'list_value_3'
    ]
    }
  4. 常用的状态模块

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    //获取所有可执行模块
    [root@saltserver ~]# salt '*' sys.list_modules

    //获取所有状态模块
    [root@saltserver ~]# salt '*' sys.list_state_modules

    //获取状态模块中的所有函数
    [root@saltserver ~]# salt '*' sys.list_state_functions pkg

    * file模块
    1.file.managed下发文件
    /etc/foo.conf:
    file.managed:
    - source:
    - salt://foo.conf
    - user: foo
    - group: users
    - mode: 644
    2.file.directory建立目录
    /etc/stuff/substuf:
    file.directory:
    - user: fred
    - group: users
    - mode: 755
    - makedirs: True
    3.file.symlink建立软连接
    /etc/grub.conf:
    file.symlink:
    - target: /boot/grub/grub.conf
    4.file.recurse下发整个目录
    /opt/code/flask:
    file.recurse:
    - source: salt://code/flask
    - include_empty: True

    * pkg模块
    1.pkg.install软件安装
    mypkgs:
    pkg.installed:
    - pkgs:
    - foo
    - bar: 1.2.3-4
    2.pkg.installed安装指定版本
    mypkgs:
    pkg.installed:
    - pkgs:
    - foo
    - bar: '>=1.2.3-4'
    3.pkg.installed指定安装的rpm来源
    mypkgs:
    pkg.installed:
    - sources:
    - foo: salt://rpms/foo.rpm
    - bar: http://somesite.com/bar.rpm
    - baz: ftp://someothersite.com/baz.rpm
    - qux: /minion/path/to/qux.rpm
    4.pkg.installed指定安装最新版本的软件
    pkg.latest:
    mypkgs:
    pkg.latest:
    - pkgs:
    - foo
    - bar

    * service模块
    1.启动服务(redis为例)
    redis:
    service.running:
    - enable: True
    - reload: True
    - watch:
    - pkg: redis

    * cron模块
    1. 每五分钟执行一次任务
    date > /tmp/crontest:
    cron.present:
    - user: root
    - minute: '*/5'

    * user模块
    1.user.present建立用户
    user.present:
    - fullname: Fred Jones
    - shell: /bin/zsh
    - home: /home/fred
    - uid: 4000
    - gid: 4000
    - groups:
    - wheel
    - storage
    - games

    * sysctl模块
    vm.swappiness:
    sysctl.present:
    - value: 20

    * pip模块
    django:
    pip.installed:
    - name: django >=1.6,<=1.7
    - require:
    - pkg: python-pip
  5. 使用requisites对状态进行排序控制

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    通过state模块可以定义minion的状态,但是如果一个主机涉及多个状态,并且状态之间有相互关联,需要在执行顺序上有先后之分,就必须引入requisites来进行控制
    例一:
    install_httpd:
    pkg_installed:
    - name: httpd
    httpd_running:
    service.running:
    - name: httpd
    - enable: True
    - require:
    - pkg: install_httpd

    综上,定义了两个状态:安装httpd和启动httpd,很明显有先后顺序,先安装httpd,然后启动httpd.我们通过require指定httpd启动时必须在install_httpd安装完成后才可以执行

    例二:
    install_httpd:
    pkg_installed:
    - name: httpd
    httpd_running:
    service.running:
    - name: httpd
    - enable: True
    - require:
    - pkg: install_httpd
    - watch:
    - file: httpd_conf
    httpd_conf:
    file.managed:
    - name: /etc/httpd/conf/httpd.conf
    - source: salt://httpd.conf
    - user: root
    - group: root
    - mode: 600

    综上,首先必须安装httpd,然后去报httpd启动,最后对比需要下发的httpd.conf和minion上已有的是否相同,如果不同就下发文件,下发文件后要触发httpd进程的重载以加载新的配置

salt write module code

salt 编写代码

  1. salt远程执行底层原理

    1
    2
    3
    4
    5
    6
    7
    salt底层通信是通过zeroMQ完成的,采用了zeroMQ的订阅发布模式.在订阅发布模式中Pub将消息发送到总线,所有的Sub接收到来自总线的消息后,根据自己的订阅条件来接收特定的消息.对应于salt中就是master将事件发布到消息总线,minion订阅并监听事件,然后minion会查看事件是否和自己匹配以确定是否需要执行.

    salt master启动时会监听两个端口,默认是4505和4506
    * 4506 salt master Ret接口,支持认证,文件服务,结果收集等
    * 4505 salt master pub接口,提供远程执行命令发送功能

    salt minion 启动时从配置文件中获取master地址,如果为域名,则进行解析.解析后,会连接master的4506端口(Ret)进行key认证.认证通过,会获取master的publish_port(4505),然后连接publish_port订阅来自master pub接口任务. 当master下发操作指令时,所有的minion都能接收到,然后minion会检查本机是否匹配.如果匹配,则执行.执行完毕后,把结果发送到master的4506(ret)由master进行处理.命令发送通信完全是异步的,并且命令包很小.此外,这些命令包通过maqpack进行序列化后数据会进一步压缩,所以salt网络负载非常低
  2. 执行模块的构成结构

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    例如: test.sleep
    def sleep(length):
    """
    Instruct the minion to initiate a process that will sleep for a given period of timezone
    CLI Example:
    .. code-block::bash
    salt '*' test.sleep 20
    """
    time.sleep(int(length))
    return True

    综上,salt的执行模块函数其实就是python函数,但通过结合salt的远程执行功能后会变得非常强大
  3. 编写自己的执行模块函数

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    * 创建模块存放位置(默认)
    [root@saltserver ~]# mkdir -p /srv/salt/_modules

    * 编写模块
    [root@saltserver ~]# vim /srv/salt/_modules/hello.py
    def world():
    """
    This is my first function.
    CLI Example:
    salt '*' hello.world
    """
    return 'Hello, world!'

    * 把模块推送到所有minion上
    [root@saltserver ~]# salt '*' saltutil.sync_modules
    192.168.13.187:
    - modules.hello

    [root@saltserver ~]# salt '*' sys.list_modules|grep hello
    - hello

    * 在所有minion上执行模块
    [root@saltserver ~]# salt '*' hello.world
    192.168.13.187:
    Hello, world!
  4. 交叉调用salt自带的模块函数

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    * __salt__函数
    [root@saltserver ~]# vim /usr/local/python27/lib/python2.7/site-packages/salt/modules/useradd.py
    ret = __salt__['cmd.run_all'](cmd, python_shell=False)

    综上,我们可以通过"__salt__"调用所有其他执行模块,就像使用salt命令一样简单

    * __grains__和__pillar__函数
    [root@saltserver ~]# vim /usr/local/python27/lib/python2.7/site-packages/salt/modules/aptpkg.py
    if __grains__.get('os_family') in ('Kali', 'Debian', 'neon'):
    if __grains__['os'] in ('Ubuntu', 'Mint', 'neon'):
    综上,可以看出__grains__类似于grain模块,可以获取主机的信息并使用
    同样,__pillar__类似于pillar模块

    * __virtual__函数
    __virtual__函数作用很特殊.salt在加载执行模块时,__virtual__函数可以帮助salt完成以下两项工作:
    1.帮助salt决定是否要加载这个模块
    2.需要时可以重新命名该模块
    例如:
    [root@saltserver ~]# vim /usr/local/python27/lib/python2.7/site-packages/salt/modules/aptpkg.py
    # Define the module's virtual name
    __virtualname__ = 'pkg'
    def __virtual__():
    '''
    Confirm this module is on a Debian based system
    '''
    if __grains__.get('os_family') in ('Kali', 'Debian', 'neon'):
    return __virtualname__
    elif __grains__.get('os_family', False) == 'Cumulus':
    return __virtualname__
    return (False, 'The pkg module could not be loaded: unsupported OS family')

    在模块加载时执行"__virtual__"函数,"__virtual__"函数通过"__grains__"函数判断操作系统是否为'kali'或'debian','neno',并返回"__virtualname__"值,而"__virtualname__"的值将作为模块名使用.如果返回值为False,则模块中函数将不被加载;如果返回值为True,salt将文件名作为模块名
  5. 编写一个完整的模块

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    * 编写模块
    [root@saltserver ~]# vim /srv/salt/_modules/prank.py
    # -*- coding:utf-8 -*-
    """
    The top nth processes which take up CPU and Memory space usage are available through this module,aditionaly;the module can get the system load information.
    """

    # import python libs
    import os

    # import salt libs
    import salt.utils

    def cpu(n):
    """
    Return the top nth processes which take up the cpu usage for this minion
    CLI Example:
    salt '*' prank.cpu <n>
    """
    cmd = "ps aux|sort -k3 -nr|head -n%s" % str(n)
    output = __salt__['cmd.run_stdout'](cmd)
    res = []
    for line in output.splitlines():
    res.append(line)
    return res

    def mem(n):
    """
    Return th top nth processes which take up the memory usage for this minion
    CLI example:
    salt '*' prink.mem <n>
    """
    cmd = "ps aux|sort -k4 -nr|head -n%s" % str(n)
    output = __salt__['cmd.run_stdout'](cmd)
    res = []
    for line in output.splitlines():
    res.append(line)
    return res

    def load():
    """
    Return the load averages for this minion
    CLI Example:
    salt '*' prink.load
    """
    load_avg = os.getloadavg()
    return {'1-min': load_avg[0],'5-min':load_avg[1],'15-min':load_avg[2]}

    * 同步模块
    [root@saltserver ~]# salt '*' saltutil.sync_modules
    192.168.13.187:
    - modules.prank

    * 查看模块帮助
    [root@saltserver ~]# salt '192.168.13.187' sys.doc prank
    prank.cpu:
    Return the top nth processes which take up the cpu usage for this minion
    CLI Example:
    salt '*' prank.cpu <n>
    prank.load:
    Return the load averages for this minion
    CLI Example:
    salt '*' prink.load <n>
    prank.mem:
    Return th top nth processes which take up the memory usage for this minion
    CLI example:
    salt '*' prink.mem <n>

    * 执行模块
    [root@saltserver ~]# salt '*' prank.load
    192.168.13.187:
    ----------
    1-min:
    0.0
    15-min:
    0.0
    5-min:
    0.0

salt install standard

salt master/minion install

  1. env init

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    171
    172
    173
    174
    175
    176
    177
    178
    179
    180
    181
    182
    183
    184
    185
    186
    187
    188
    189
    190
    191
    192
    193
    194
    195
    196
    197
    198
    199
    200
    201
    202
    //机器信息
    192.168.13.188 saltserver # CentOS 7.0
    192.168.13.187 saltminion # CentOS 6.5

    //salt-master
    [root@localhost ~]# cat /etc/redhat-release
    CentOS Linux release 7.0.1406 (Core)

    [root@localhost ~]# hostname saltserver && echo saltserver |tee /etc/hostname
    [root@localhost ~]# $SHELL
    [root@saltserver ~]# echo '192.168.13.188 saltserver' |tee -a /etc/hosts
    [root@saltserver ~]# echo '192.168.13.187 saltminion' |tee -a /etc/hosts

    [root@saltserver ~]# mkfs.xfs /dev/vdb
    [root@saltserver ~]# echo '/dev/vdb /mnt xfs defaults 0 0' | tee -a /etc/fstab
    [root@saltserver ~]# mount -a

    [root@saltserver ~]# echo '* - nproc 65535' | tee -a /etc/security/limits.conf
    [root@saltserver ~]# echo '* - nofile 65535' | tee -a /etc/security/limits.conf
    [root@saltserver ~]# ls /etc/security/limits.d/|xargs rm -f

    [root@saltserver ~]# mkdir /etc/yum.repos.d/backup && mv /etc/yum.repos.d/{*,backup}
    [root@saltserver ~]# curl -o /etc/yum.repos.d/epel-7.repo http://mirrors.aliyun.com/repo/epel-7.repo
    [root@saltserver ~]# curl -o /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-7.repo
    [root@saltserver ~]# curl -O https://mirrors.aliyun.com/centos/RPM-GPG-KEY-CentOS-7
    [root@saltserver ~]# rpm --import RPM-GPG-KEY-CentOS-7
    [root@saltserver ~]# rm -f RPM-GPG-KEY-CentOS-7
    [root@saltserver ~]# yum clean all
    [root@saltserver ~]# yum makecache

    [root@saltserver ~]# yum -y install gcc gcc-c++ make cmake bison libtool autoconf automake zip unzip bzip2 zlib zlib-devel openssl openssl-devel openssl-static pcre pcre-devel bison-devel ncurses-devel tcl tcl-devel perl-Digest-SHA1 GeoIP GeoIP-devel gperftools gperftools-devel libatomic_ops-devel gtest gtest-devel glibc-devel unixODBC-devel fop libperl libpython readline readline-devel python-devel python2-pip python-crypto readline readline-devel readline-static sqlite-devel bzip2-devel bzip2-libs openldap-devel gdk-pixbuf2 gdk-pixbuf2-devel libffi libffi-devel libcurl libcurl-devel http-parser http-parser-devel libssh2 libssh2-devel git lftp ntp ntpdate vim wget telnet dstat tree lrzsz net-tools nmap-ncat nmap sysstat

    [root@saltserver ~]# setenforce 0
    [root@saltserver ~]# sed -i s/'SELINUX=enforcing'/'SELINUX=disabled'/g /etc/selinux/config

    [root@saltserver ~]# systemctl stop firewalld && systemctl disable firewalld

    [root@saltserver ~]# [ -f /etc/localtime ] && cp -f /usr/share/zoneinfo/Asia/Shanghai /etc/localtime
    [root@saltserver ~]# [ -f /etc/sysconfig/clock ] && echo 'ZONE="Asia/Shanghai"' | tee /etc/sysconfig/clock
    [root@saltserver ~]# [ -f /etc/timezone ] && echo 'Asia/Shanghai' | tee /etc/timezone
    [root@saltserver ~]# [ -f /etc/sysconfig/ntpd ] && echo 'SYNC_HWCLOCK=yes' | tee -a /etc/sysconfig/ntpd
    [root@saltserver ~]# ntpdate cn.pool.ntp.org
    [root@saltserver ~]# cp -f /etc/{ntp.conf,ntp.conf.bak}
    [root@saltserver ~]# cat > /etc/ntp.conf <<EOF
    > driftfile /var/lib/ntp/drift
    > restrict default nomodify notrap nopeer noquery
    > restrict 127.0.0.1
    > restrict ::1
    > server cn.pool.ntp.org prefer
    > server 0.centos.pool.ntp.org iburst
    > server 1.centos.pool.ntp.org iburst
    > server 2.centos.pool.ntp.org iburst
    > server 3.centos.pool.ntp.org iburst
    > includefile /etc/ntp/crypto/pw
    > keys /etc/ntp/keys
    > disable monitor
    > EOF
    [root@saltserver ~]# cp -f /etc/ntp/{step-tickers,step-tickers.bak}
    [root@saltserver ~]# cat > /etc/ntp/step-tickers <<EOF
    > cn.pool.ntp.org
    > 0.centos.pool.ntp.org
    > 1.centos.pool.ntp.org
    > 2.centos.pool.ntp.org
    > 3.centos.pool.ntp.org
    > EOF
    [root@saltserver ~]# systemctl start ntpd && systemctl enable ntpd

    [root@saltserver ~]# mkdir -p /mnt/{app,data,log,web,ops/{app,data,cron}}

    [root@saltserver ~]# mkdir ~/.pip
    [root@saltserver ~]# cat > ~/.pip/pip.conf <<EOF
    > [global]
    > trusted-host=mirrors.aliyun.com
    > index-url=http://mirrors.aliyun.com/pypi/simple/
    > [list]
    > format=columns
    > EOF
    [root@saltserver ~]# pip install --upgrade pip
    [root@saltserver ~]# python -V
    Python 2.7.5

    [root@saltserver app]# wget https://www.python.org/ftp/python/2.7.13/Python-2.7.13.tar.xz
    [root@saltserver app]# xz -d Python-2.7.13.tar.xz
    [root@saltserver app]# tar xf Python-2.7.13.tar
    [root@saltserver app]# cd Python-2.7.13
    [root@saltserver Python-2.7.13]# ./configure --prefix=/usr/local/python27
    [root@saltserver Python-2.7.13]# make
    [root@saltserver Python-2.7.13]# make install

    [root@saltserver Python-2.7.13]# wget https://bootstrap.pypa.io/get-pip.py
    [root@saltserver Python-2.7.13]# /usr/local/python27/bin/python get-pip.py

    [root@saltserver Python-2.7.13]# echo 'export PYTHON_PATH=/usr/local/python27' |tee /etc/profile.d/python27.sh
    [root@saltserver Python-2.7.13]# echo 'export PYTHON_BIN=$PYTHON_PATH/bin' |tee -a /etc/profile.d/python27.sh
    [root@saltserver Python-2.7.13]# echo 'export PATH=$PYTHON_BIN:$PATH' |tee -a /etc/profile.d/python27.sh
    [root@saltserver Python-2.7.13]# source /etc/profile

    [root@saltserver Python-2.7.13]# rm -f /usr/bin/{python,pip}
    [root@saltserver Python-2.7.13]# sed -i s/python/python2.7/g /usr/bin/yum
    [root@saltserver Python-2.7.13]# sed -i s/python/python2.7/g /usr/libexec/urlgrabber-ext-down

    [root@saltserver Python-2.7.13]# python -V
    Python 2.7.13
    [root@saltserver Python-2.7.13]# pip -V
    pip 9.0.1 from /usr/local/python27/lib/python2.7/site-packages (python 2.7)


    //salt-minion
    [root@localhost ~]# cat /etc/redhat-release
    CentOS release 6.5 (Final)

    [root@localhost ~]# hostname saltminion && echo saltminion |tee /etc/hostname
    [root@localhost ~]# $SHELL
    [root@saltminion ~]# echo '192.168.13.188 saltserver' |tee -a /etc/hosts
    [root@saltminion ~]# echo '192.168.13.187 saltminion' |tee -a /etc/hosts

    [root@saltminion ~]# mkfs.xfs /dev/vdb
    [root@saltminion ~]# echo '/dev/vdb /mnt xfs defaults 0 0' | tee -a /etc/fstab
    [root@saltminion ~]# mount -a

    [root@saltminion ~]# echo '* - nproc 65535' | tee -a /etc/security/limits.conf
    [root@saltminion ~]# echo '* - nofile 65535' | tee -a /etc/security/limits.conf
    [root@saltminion ~]# ls /etc/security/limits.d/|xargs rm -f

    [root@saltminion ~]# mkdir /etc/yum.repos.d/backup && mv /etc/yum.repos.d/{*,backup}
    [root@saltminion ~]# curl -o /etc/yum.repos.d/epel-6.repo http://mirrors.aliyun.com/repo/epel-6.repo
    [root@saltminion ~]# curl -o /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-6.repo
    [root@saltminion ~]# curl -O https://mirrors.aliyun.com/centos/RPM-GPG-KEY-CentOS-6
    [root@saltminion ~]# rpm --import RPM-GPG-KEY-CentOS-6
    [root@saltminion ~]# rm -f RPM-GPG-KEY-CentOS-6
    [root@saltminion ~]# yum clean all
    [root@saltminion ~]# yum makecache

    [root@saltminion ~]# yum groupinstall "Development tools"
    [root@saltminion ~]# yum -y install gcc gcc-c++ make cmake bison libtool autoconf automake zip unzip bzip2 zlib zlib-devel openssl openssl-devel openssl-static pcre pcre-devel bison-devel ncurses-devel tcl tcl-devel perl-Digest-SHA1 GeoIP GeoIP-devel gperftools gperftools-devel libatomic_ops-devel gtest gtest-devel glibc-devel unixODBC-devel fop libperl libpython readline readline-devel python-devel python2-pip python-crypto readline readline-static sqlite-devel bzip2-devel bzip2-libs openldap-devel gdk-pixbuf2 gdk-pixbuf2-devel libffi libffi-devel libcurl libcurl-devel http-parser http-parser-devel libssh2 libssh2-devel tk-devel gdbm-devel db4-devel libpcap-devel xz xz-devel git lftp ntp ntpdate vim wget telnet dstat tree lrzsz net-tools nmap-ncat nmap sysstat

    [root@saltminion ~]# setenforce 0
    [root@saltminion ~]# sed -i s/'SELINUX=enforcing'/'SELINUX=disabled'/g /etc/selinux/config

    [root@saltminion ~]# /etc/init.d/iptables stop && chkconfig iptables off

    [root@saltminion ~]# [ -f /etc/localtime ] && cp -f /usr/share/zoneinfo/Asia/Shanghai /etc/localtime
    [root@saltminion ~]# [ -f /etc/sysconfig/clock ] && echo 'ZONE="Asia/Shanghai"' | tee /etc/sysconfig/clock
    [root@saltminion ~]# [ -f /etc/timezone ] && echo 'Asia/Shanghai' | tee /etc/timezone
    [root@saltminion ~]# [ -f /etc/sysconfig/ntpd ] && echo 'SYNC_HWCLOCK=yes' | tee -a /etc/sysconfig/ntpd
    [root@saltminion ~]# ntpdate cn.pool.ntp.org
    [root@saltminion ~]# cp -f /etc/{ntp.conf,ntp.conf.bak}
    [root@saltminion ~]# cat > /etc/ntp.conf <<EOF
    > driftfile /var/lib/ntp/drift
    > restrict default nomodify notrap nopeer noquery
    > restrict 127.0.0.1
    > restrict ::1
    > server cn.pool.ntp.org prefer
    > server 0.centos.pool.ntp.org iburst
    > server 1.centos.pool.ntp.org iburst
    > server 2.centos.pool.ntp.org iburst
    > server 3.centos.pool.ntp.org iburst
    > includefile /etc/ntp/crypto/pw
    > keys /etc/ntp/keys
    > disable monitor
    > EOF
    [root@saltminion ~]# cp -f /etc/ntp/{step-tickers,step-tickers.bak}
    [root@saltminion ~]# cat > /etc/ntp/step-tickers <<EOF
    > cn.pool.ntp.org
    > 0.centos.pool.ntp.org
    > 1.centos.pool.ntp.org
    > 2.centos.pool.ntp.org
    > 3.centos.pool.ntp.org
    > EOF
    [root@saltminion ~]# /etc/init.d/ntpd start && chkconfig ntpd on

    [root@saltminion ~]# mkdir -p /mnt/{app,data,log,web,ops/{app,data,cron}}

    [root@saltminion ~]# mkdir ~/.pip
    [root@saltminion ~]# cat > ~/.pip/pip.conf <<EOF
    > [global]
    > trusted-host=mirrors.aliyun.com
    > index-url=http://mirrors.aliyun.com/pypi/simple/
    > [list]
    > format=columns
    > EOF
    [root@saltminion ~]# python -V
    Python 2.6.6

    [root@saltminion app]# wget https://www.python.org/ftp/python/2.7.13/Python-2.7.13.tar.xz
    [root@saltminion app]# xz -d Python-2.7.13.tar.xz
    [root@saltminion app]# tar xf Python-2.7.13.tar
    [root@saltminion app]# cd Python-2.7.13
    [root@saltminion Python-2.7.13]# ./configure --prefix=/usr/local/python27
    [root@saltminion Python-2.7.13]# make
    [root@saltminion Python-2.7.13]# make install
    [root@saltminion Python-2.7.13]# wget https://bootstrap.pypa.io/get-pip.py
    [root@saltminion Python-2.7.13]# echo 'export PYTHON_PATH=/usr/local/python27' |tee /etc/profile.d/python27.sh
    [root@saltminion Python-2.7.13]# echo 'export PYTHON_BIN=$PYTHON_PATH/bin' |tee -a /etc/profile.d/python27.sh
    [root@saltminion Python-2.7.13]# echo 'export PATH=$PYTHON_BIN:$PATH' |tee -a /etc/profile.d/python27.sh
    [root@saltminion Python-2.7.13]# source /etc/profile
    [root@saltminion Python-2.7.13]# rm -f /usr/bin/{python,pip}
    [root@saltminion Python-2.7.13]# sed -i s/python/python2.6/g /usr/bin/yum
    [root@saltminion Python-2.7.13]# python -V
    Python 2.7.13
    [root@saltminion Python-2.7.13]# pip -V
    pip 9.0.1 from /usr/local/python27/lib/python2.7/site-packages (python 2.7)
  2. salt master/minion install

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    //salt-master
    [root@saltserver ~]# pip install salt

    [root@saltserver ~]# mkdir -p /etc/salt
    [root@saltserver ~]# cat > /etc/salt/master <<EOF
    > interface: 0.0.0.0
    > ipv6: False
    > publish_port: 4505
    > ret_port: 4506
    > user: root
    > EOF
    [root@saltserver ~]# salt-master -c /etc/salt -d //启动master


    //salt-minon
    [root@saltminion ~]# pip install salt

    [root@saltminion ~]# mkdir -p /etc/salt
    [root@saltminion ~]# cat > /etc/salt/minion <<EOF
    > master: 192.168.13.188
    > master_port: 4506
    > user: root
    > id: 192.168.13.187
    > EOF

    [root@saltminion ~]# salt-minion -c /etc/salt -d //启动minion
  3. salt master与minion 建立连接

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    //salt-master 查看秘钥
    [root@saltserver ~]# salt-key -L
    Accepted Keys:
    Denied Keys:
    Unaccepted Keys:
    192.168.13.187
    Rejected Keys:

    //salt-master 查看 minion 秘钥信息
    [root@saltserver ~]# salt-key -f 192.168.13.187
    Unaccepted Keys:
    192.168.13.187: 34:6d:95:cf:17:98:fc:cf:99:2d:0f:2d:e9:ec:f8:95:ac:00:ba:f7:6c:71:ab:cf:aa:4a:05:7e:8c:04:97:81
    //salt-minion 查看 minion 秘钥信息
    [root@saltminion ~]# salt-call --local key.finger
    local:
    34:6d:95:cf:17:98:fc:cf:99:2d:0f:2d:e9:ec:f8:95:ac:00:ba:f7:6c:71:ab:cf:aa:4a:05:7e:8c:04:97:81

    //salt-master 接受 minion 秘钥认证
    [root@saltserver ~]# salt-key -a 192.168.13.187
    The following keys are going to be accepted:
    Unaccepted Keys:
    192.168.13.187
    Proceed? [n/Y] Y
    Key for minion 192.168.13.187 accepted.

    [root@saltserver ~]# salt-key -L
    Accepted Keys:
    192.168.13.187
    Denied Keys:
    Unaccepted Keys:
    Rejected Keys:

    //salt-master 自动签发证书
    方法一:
    echo 'auto_accept: True' |tee -a /etc/salt/master
    方法二:
    salt-key -A -y
  4. salt-master测试指令

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    //salt-master 给 所有的minion 发送指令
    [root@saltserver ~]# salt '*' test.ping
    192.168.13.187:
    True

    //salt-master 查找 minion test 函数列表
    [root@saltserver ~]# salt '192.168.13.187' sys.list_functions test
    192.168.13.187:
    - test.arg
    - test.arg_repr
    - test.arg_type
    - test.assertion
    - test.attr_call
    - test.collatz
    - test.conf_test
    - test.cross_test
    - test.echo
    - test.exception
    - test.false
    - test.fib
    - test.get_opts
    - test.kwarg
    - test.module_report
    - test.not_loaded
    - test.opts_pkg
    - test.outputter
    - test.ping
    - test.provider
    - test.providers
    - test.rand_sleep
    - test.rand_str
    - test.retcode
    - test.sleep
    - test.stack
    - test.true
    - test.try_
    - test.tty
    - test.version
    - test.versions
    - test.versions_information
    - test.versions_report

    //salt-master 查找 minion test 函数使用方法
    [root@saltserver ~]# salt '192.168.13.187' sys.doc test.echo
    test.echo:
    Return a string - used for testing the connection
    CLI Example:
    salt '*' test.echo 'foo bar baz quo qux'

    //salt-master 执行 minion test 函数
    [root@saltserver ~]# salt '192.168.13.187' test.echo 'Hello WOrld!'
    192.168.13.187:
    Hello WOrld!
  5. salt 远程执行命令

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    171
    172
    173
    174
    175
    176
    177
    178
    179
    180
    181
    182
    183
    184
    185
    186
    187
    188
    189
    190
    191
    192
    193
    194
    195
    196
    197
    198
    199
    200
    201
    202
    203
    204
    205
    206
    207
    208
    209
    210
    211
    212
    213
    214
    215
    216
    217
    218
    219
    220
    221
    222
    223
    224
    225
    226
    227
    228
    229
    230
    231
    232
    233
    234
    235
    236
    237
    238
    239
    240
    241
    242
    243
    244
    245
    246
    247
    248
    249
    250
    251
    252
    253
    254
    255
    256
    257
    258
    259
    260
    261
    262
    263
    264
    265
    266
    267
    268
    269
    270
    271
    272
    273
    274
    275
    276
    277
    278
    279
    280
    281
    282
    283
    284
    285
    286
    287
    288
    289
    290
    291
    292
    293
    294
    295
    296
    297
    298
    299
    300
    salt [options] '<target>' <function> [arguments]
    说明:
    * 第一部分,salt命令本身
    * 第二部分,命令行选项
    * 第三部分,目标定位字符串
    * 第四部分,slat模块函数
    * 第五部分,远程执行函数参数

    例如:
    [root@saltserver ~]# salt --summary '192.168.13.187' cmd.run 'uptime'
    192.168.13.187:
    14:39:06 up 5:17, 1 user, load average: 2.00, 1.93, 1.30
    -------------------------------------------
    Summary
    -------------------------------------------
    # of minions targeted: 1
    # of minions returned: 1
    # of minions that did not return: 0
    # of minions with errors: 0
    -------------------------------------------


    //第二部分,命令行选项
    * -v,--verbose 描述命令执行后,会发生什么(命令执行过程)
    [root@saltserver ~]# salt --verbose '*' cmd.run_all "echo my salt"
    Executing job with jid 20170804144256968580
    -------------------------------------------
    192.168.13.187:
    ----------
    pid:
    26067
    retcode:
    0
    stderr:
    stdout:
    my salt

    * --summary 显示salt命令概要
    [root@saltserver ~]# salt --summary '*' cmd.run_all "echo my salt"
    192.168.13.187:
    ----------
    pid:
    26073
    retcode:
    0
    stderr:
    stdout:
    my salt
    -------------------------------------------
    Summary
    -------------------------------------------
    # of minions targeted: 1
    # of minions returned: 1
    # of minions that did not return: 0
    # of minions with errors: 0
    -------------------------------------------

    * --out 控制salt执行后的输出格式
    [root@saltserver ~]# salt --out=json '*' cmd.run_all "echo my salt"
    {
    "192.168.13.187": {
    "pid": 26079,
    "retcode": 0,
    "stderr": "",
    "stdout": "my salt"
    }
    }
    [root@saltserver ~]# salt --out=yaml '*' cmd.run_all "echo my salt"
    192.168.13.187:
    pid: 26085
    retcode: 0
    stderr: ''
    stdout: my salt
    [root@saltserver ~]# salt --out=raw '*' cmd.run_all "echo my salt"
    {'192.168.13.187': {'pid': 26091, 'retcode': 0, 'stderr': '', 'stdout': 'my salt'}}


    //第三部分,目标定位字符串
    * 全局匹配
    * 任意字符,可以是空字符串
    ? 一个字符,不可以是空字符串
    [] 字符集合

    例如:
    salt '*' test.ping
    salt '192.168.13.*' test.ping
    salt 'minion[a-z]' test.ping

    * 正则表达式匹配
    abc 匹配自身abc
    . 匹配任意字符(除换行符)
    \ 转义字符
    [..] 字符集,匹配字符集中的任意一个字符
    \d 匹配数字[0-9]
    \D 匹配非数字[^\d]
    \s 匹配空白字符[<空格>\n\t\r\v\f]
    \S 匹配非空字符[^\s]
    \w 匹配单词字符[a-zA-Z0-9]
    \W 匹配非单词字符[^\w]
    * 匹配前一个字符0次或无限次
    + 匹配前一个字符1次或无限次
    ? 匹配前一个字符0次或1次
    {m} 匹配前一个字符m次
    {m,n} 匹配前一个字符m到n次
    ^ 匹配字符串开头
    $ 匹配字符串结尾
    | 逻辑或
    (..) 表达式分组,作为一个整体
    (?P<name>...) 指定别名
    (?P<name>) 引用别名

    例如:
    salt -E 'minion' test.ping
    salt -E '.*' test.ping
    salt -E '^minion-*' test.ping
    salt -E '*-minion$' test.ping

    * 列表匹配
    salt -L 'minion1' test.ping
    salt -L 'minion1,minion2,minion3' test.ping

    配置文件中定义nodegroups:
    /etc/salt/master:
    modegroups:
    minions:
    - minion1
    - minion2
    salt -N minions test.ping

    * grain 和 pillar 匹配
    grain和pillar都是以key/value形式存储的数据库
    grain是由minion返回给master的数据;而pillar是存储在master上的数据
    每个minion可以看到自己的pillar,grain可以看做是主机的元数据(metadata)
    换言之,一个minion可以告诉master自己的grain数据,而minion需要从master索取pillar数据

    * grains
    grains可以认为是描述minion本身固有属性的静态数据
    例如:
    //检索所有属性
    [root@saltserver ~]# salt --out=yaml '192.168.13.187' grains.items

    //检索os属性
    [root@saltserver ~]# salt --out=yaml '192.168.13.187' grains.item os
    {
    "192.168.13.187": {
    "os": "CentOS"
    }
    }

    //通过grain定位主机
    [root@saltserver ~]# salt -G 'os:CentOS' test.ping
    192.168.13.187:
    True

    //自定义grain
    [root@saltserver ~]# salt '192.168.13.187' grains.setval cpu_num 8
    or:
    [root@saltserver ~]# salt '192.168.13.187' grains.setval cpu_info ['Intel','Xeon','8']

    //删除自定义grain
    [root@saltserver ~]# salt '192.168.13.187' grains.delval cpu_info
    注意: 自定义grain一般存储在"/etc/salt/grains"文件中,修改该文件,删除自定义内容,重启salt-minion服务

    * pillar
    pillars数据类似grains,不同之处在于pillars数据可以定义为更加动态的形式,并且是一个安全的数据库
    例如:
    //列出主机所有pillar数据
    [root@saltserver ~]# salt '192.168.13.187' pillar.items
    192.168.13.187:
    ----------

    //查看单个数据命令
    [root@saltserver ~]# salt '192.168.13.187' pillar.item role

    //pillar定位主机
    [root@saltserver ~]# salt -I 'role:web' test.ping

    * 复合匹配
    G G@os:CentOS
    E E@web\d+\(dev|qa|prod)\.loc
    P P@os:(redhat|centos|fedora)
    L L@minion1,minion2,minion3
    I I@pdata:foobar
    S S@192.168.1.0/24 or S@192.168.1.100
    R R@%foo.bar

    例如:
    salt -C 'minion-* and G@os:CentOS not E@.*-two$' test.ping


    //第四部分,slat模块函数 和 第五部分,远程执行函数参数
    远程执行模块构成:
    远程执行命令的最后一部分是我们需要运行的模块以及相关函数和对应的执行参数
    模块可以认为是函数的逻辑分组,一系列的函数组合在一起构成一个模块

    所有的远程执行命令格式都是"<module>.<function>"格式,例如:
    [root@saltserver ~]# salt '192.168.13.187' sys.list_modules
    [root@saltserver ~]# salt '192.168.13.187' sys.list_functions test
    [root@saltserver ~]# salt '192.168.13.187' sys.doc test.sleep

    * 远程命令执行模块
    [root@saltserver ~]# salt '*' cmd.run 'ps aux|wc -l'
    192.168.13.187:
    139
    [root@saltserver ~]# salt '*' cmd.run_all 'ps aux|wc -l'
    192.168.13.187:
    ----------
    pid:
    26257
    retcode:
    0
    stderr:
    stdout:
    139

    * 安装包管理
    [root@saltserver ~]# salt '192.168.13.187' pkg.install "httpd"
    192.168.13.187:
    ----------
    apr-util-ldap:
    ----------
    new:
    1.3.9-3.el6_0.1
    old:
    httpd:
    ----------
    new:
    2.2.15-60.el6.centos.4
    old:
    httpd-tools:
    ----------
    new:
    2.2.15-60.el6.centos.4
    old:

    [root@saltserver ~]# salt '192.168.13.187' pkg.version "httpd"
    192.168.13.187:
    2.2.15-60.el6.centos.4

    [root@saltserver ~]# salt '192.168.13.187' pkg.remove "httpd"
    192.168.13.187:
    ----------
    httpd:
    ----------
    new:
    old:
    2.2.15-60.el6.centos.4

    * 管理服务模块
    [root@saltserver ~]# salt '192.168.13.187' service.status "httpd"
    192.168.13.187:
    False

    [root@saltserver ~]# salt '192.168.13.187' service.start "httpd"
    192.168.13.187:
    True

    [root@saltserver ~]# salt '192.168.13.187' service.stop "httpd"
    192.168.13.187:
    True

    * 文件管理模块
    [root@saltserver ~]# salt '192.168.13.187' file.stats '/etc/yum.conf'
    192.168.13.187:
    ----------
    atime:
    1501780111.82
    ctime:
    1499286732.05
    gid:
    0
    group:
    root
    inode:
    1044735
    mode:
    0644
    mtime:
    1361532394.0
    size:
    969
    target:
    /etc/yum.conf
    type:
    file
    uid:
    0
    user:
    root
    .168.13.187:
    None

    [root@saltserver ~]# salt '192.168.13.187' file.chown /etc/yum.conf root root
    192.168.13.187:
    None

    * 用户管理模块
    slat '*' user.add <name> <uid> <gid> <groups> <home> <shell>
    slat '*' user.delete <name>
    slat '*' user.info <name>

linux 设置 包含头文件路径 和 动态库链接路径

linux 设置 包含头文件路径 和 动态库链接路径

  1. 简介

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    C/C++程序在linux下被编译和连接时,GCC/G++会查找系统默认的include和link的路径,以及自己在编译命令中指定的路径.
    而我们自己安装的第三方软件包后,如果指定了路径,我们如何加入到系统环境变量中?

    //include头文件路径
    除了默认的"/usr/include","/usr/local/include"等include路径外,还可以通过设置环境变量来添加系统include的路径:
    * C
    export C_INCLUDE_PATH=XXXX:$C_INCLUDE_PATH
    * CPP
    export CPLUS_INCLUDE_PATH=XXX:$CPLUS_INCLUDE_PATH

    //动态库链接库文件路径
    一般Linux系统把"/lib","/usr/lib","/usr/local/lib"作为默认的库搜索路径,所以使用这几个目录中的链接库文件可直接被搜索到.还可以通过设置环境变量来添加到搜索中
    方法一:
    * 动态链接库搜索路径:
    export LD_LIBRARY_PATH=XXX:$LD_LIBRARY_PATH
    * 静态链接库搜索路径:
    export LIBRARY_PATH=XXX:$LIBRARY_PATH
    方法二:
    echo "/path/to/app/lib" | tee /etc/ld.so.conf.d/app.conf
    ldconfig
  2. 实例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    [root@smallasa build]# pip install pygit2
    报错信息:
    src/types.h:36:2: error: #error You need a compatible libgit2 version (v0.26.x)

    解决方法:
    [root@smallasa app]# wget https://codeload.github.com/libgit2/libgit2/tar.gz/v0.26.0
    [root@smallasa app]# tar xzf v0.26.
    [root@smallasa app]# cd libgit2-0.26.0/
    [root@smallasa libgit2-0.26.0]# mkdir build && cd build
    [root@smallasa build]# cmake ..
    [root@smallasa build]# cmake --build .
    [root@smallasa build]# make test
    [root@smallasa build]# ./libgit2_clar
    [root@smallasa build]# cmake .. -DCMAKE_INSTALL_PREFIX=/usr/local/libgit2
    [root@smallasa build]# cmake --build . --target install

    设置头文件路径 和 动态库链接路径:
    [root@smallasa build]# echo 'export C_INCLUDE_PATH=/usr/local/libgit2/include:$C_INCLUDE_PATH' |tee /etc/profile.d/libgit2.sh
    [root@smallasa build]# echo 'export CPLUS_INCLUDE_PATH=/usr/local/libgit2/include:$CPLUS_INCLUDE_PATH' |tee -a /etc/profile.d/libgit2.sh
    [root@smallasa build]# echo 'export LD_LIBRARY_PATH=/usr/local/libgit2/lib:$LD_LIBRARY_PATH' | tee -a /etc/profile.d/libgit2.sh
    [root@smallasa build]# echo 'export LIBRARY_PATH=/usr/local/libgit2/lib:$LIBRARY_PATH' | tee -a /etc/profile.d/libgit2.sh
    [root@smallasa build]# source /etc/profile

    提示: 如果不指定头文件路径和动态链接库路径,会报错如下:
    src/blob.h:33:18: fatal error: git2.h: No such file or directory

    成功安装:
    [root@smallasa build]# pip install pygit2
    Successfully installed pygit2-0.26.0

vim shortcut key

vim 快捷键

  1. 删除行

    1
    2
    3
    4
    5
    dd    删除(剪切)光标当前1行
    {n}dd 删除(剪切)光标之下n行

    dgg 剪切光标以上的所有行
    dG 剪切光标以下的所有行
  2. 删除词

    1
    2
    x   删除光标所在处字符
    X 删除光标所在前字符

salt info

salt 介绍

  1. saltstack是什么?
    SaltStack是基于Python开发的一套C/S架构配置管理工具,它的底层使用ZeroMQ消息队列pub/sub方式通信,使用SSL证书签发的方式进行认证管理.

  2. saltstack服务架构
    SaltStack是一种基于C/S架构的服务模式,可以简单地理解为如果我们想使用SaltStack就需要在现有的环境下引入与护一套C/S架构.
    在SaltStack架构中服务器端叫作Master,客户端叫作Minion.
    在我们理解的传统C/S架构中,客户端发送请求给服务器端,服务器端接收到请求并且处理完成后再返回给客户端.
    在SaltStack架构中不仅有传统的C/S架构服务模式,而且有消息队列中的发布与订阅(pub/sub)服务模式.
    这使得SaltStack应用场景更加丰富.目前在实际环境中一般使用SaltStack的C/S架构进行配置管理

  3. saltstack设计理念
    saltstack 两个主要设计理念是: 远程执行 和 配置管理

  4. saltstack架构模型

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    模式一: Master -> Minion
    master和所有的minion都直接连接,minion接收来自master的指令,完成命令执行或配置管理

    模式二: Master -> Syndic -> Minion
    master通过syndic对minion进行管理,同时该架构可进行多级扩展

    模式三: Minion(无master)
    无master的minion,minion不接受任何master控制,通过本地运行即可完成相关功能

    模式四: Master -> Master -> Minion
    多Master架构,所有的Minion将连接到所有配置的Master上去

    模式五: salt-ssh
    通过SSH通道直接在远程主机上执行使用SaltStack,而不需要在远程主机上运行Salt Minion,同时又能支持SaltStack的大部分功能,而且Salt Master也不需要运行了

    模式六: salt-cloud

    模式七: salt-proxy

RabbitMQ python pika

RabbitMQ python pika

  1. RabbitMQ 安装和用户权限设置

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    * RabbitMQ 安装,此步骤忽略

    * 创建vhost
    [wisdom@rabbitmq188 ~]$ /mnt/app/rabbitmq/sbin/rabbitmqctl add_vhost /test
    Creating vhost "/test"
    [wisdom@rabbitmq188 ~]$ /mnt/app/rabbitmq/sbin/rabbitmqctl list_vhosts|grep test
    /test

    * 对创建的vhost设置高可用
    [wisdom@rabbitmq188 ~]$ /mnt/app/rabbitmq/sbin/rabbitmqctl set_policy -p /test ha-all "^" '{"ha-mode":"all"}'
    Setting policy "ha-all" for pattern "^" to "{\"ha-mode\":\"all\"}" with priority "0"
    [wisdom@rabbitmq188 ~]$ /mnt/app/rabbitmq/sbin/rabbitmqctl list_policies -p /test Listing policies
    /test ha-all all ^ {"ha-mode":"all"} 0

    * 创建用户
    [wisdom@rabbitmq188 ~]$ /mnt/app/rabbitmq/sbin/rabbitmqctl add_user test test123
    Creating user "test"
    [wisdom@rabbitmq188 ~]$ /mnt/app/rabbitmq/sbin/rabbitmqctl list_users|grep test
    test []

    * 为用户设置角色
    [wisdom@rabbitmq188 ~]$ /mnt/app/rabbitmq/sbin/rabbitmqctl set_user_tags test administrator monitoring
    Setting tags for user "test" to [administrator,monitoring]
    [wisdom@rabbitmq188 ~]$ /mnt/app/rabbitmq/sbin/rabbitmqctl list_users|grep test test [administrator, monitoring]

    * 为用户设置权限
    [wisdom@rabbitmq188 ~]$ /mnt/app/rabbitmq/sbin/rabbitmqctl set_permissions -p /test test '.*' '.*' '.*'
    Setting permissions for user "test" in vhost "/test"
    [wisdom@rabbitmq188 ~]$ /mnt/app/rabbitmq/sbin/rabbitmqctl list_permissions -p /test Listing permissions in vhost "/test"
    test .* .* .*
  2. Python pika install

    1
    [root@rabbitmq188 ~]# pip install pika
  3. RabbitMQ 单向发送消息
    RabbitMQ 单向发送消息

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    //生产者
    [wisdom@rabbitmq188 web]$ /mnt/app/rabbitmq/sbin/rabbitmqctl list_queues -p /test
    Listing queues

    [wisdom@rabbitmq188 web]$ cat > msg_send.py <<EOF
    #!/usr/bin/env python
    #-*- coding:utf-8 -*-

    import sys
    import pika

    # 建立与RabbitMQ建立连接
    credentials = pika.PlainCredentials("test","test123")

    conn_params = pika.ConnectionParameters("192.168.13.188",
    5672,
    "/test",
    credentials = credentials)

    conn_broker = pika.BlockingConnection(conn_params)

    # 创建信道(生产者和消费者与RabbitMQ建立一条通道)
    channel = conn_broker.channel()

    # 创建队列(消息发送到队列)
    channel.queue_declare(queue='queue_hello')

    # 发布消息
    channel.basic_publish(exchange="",
    routing_key="queue_hello",
    body='Hello World!')

    print("msg_send 'Hello World!'")

    # 关闭与RabbitMQ建立连接
    conn_broker.close()
    EOF

    [wisdom@rabbitmq188 web]$ python msg_send.py
    msg_send 'Hello World!'
    [wisdom@rabbitmq188 web]$ /mnt/app/rabbitmq/sbin/rabbitmqctl list_queues -p /test |grep hello
    queue_hello 1
    [wisdom@rabbitmq188 web]$ /mnt/app/rabbitmq/sbin/rabbitmqctl list_exchanges -p /test |grep hello
    exchange_hello direct


    //消费者
    [wisdom@rabbitmq188 web]$ cat > msg_receive.py <<EOF
    #!/usr/bin/env python
    #-*- coding:utf-8 -*-

    import sys
    import pika

    # 建立与RabbitMQ建立连接
    credentials = pika.PlainCredentials("test","test123")

    conn_params = pika.ConnectionParameters("192.168.13.188",
    5672,
    "/test",
    credentials = credentials)

    conn_broker = pika.BlockingConnection(conn_params)

    # 创建信道(生产者和消费者与RabbitMQ建立一条通道)
    channel = conn_broker.channel()

    # 创建队列(消息发送到队列)
    channel.queue_declare(queue='queue_hello')

    # 创建消费消息
    def callback(ch, method, properties, body):
    print("[x] Received %r" % body)

    # 订阅消息
    channel.basic_consume(callback,
    queue='queue_hello',
    no_ack=True)

    print('[*] Waiting for messages. To exit press CTRL+C')

    # 开始消费
    channel.start_consuming()
    EOF

    [wisdom@rabbitmq188 web]$ python msg_receive.py
    [*] Waiting for messages. To exit press CTRL+C


    //执行msg_send.py 脚本,然后观察 msg_receive.py
    [wisdom@rabbitmq188 web]$ python msg_receive.py
    [*] Waiting for messages. To exit press CTRL+C
    [x] Received 'Hello World!'
    [x] Received 'Hello World!'
  4. RabbitMQ 工作队列

    1
    消息也可以理解为任务,消息发送者可以理解为任务分配者,消息接收者可以理解为工作者,当工作者接收到一个任务,还没完成的时候,任务分配者又发一个任务过来,那就忙不过来了,于是就需要多个工作者来共同处理这些任务,这些工作者,就称为工作队列

RabbitMQ 工作队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
//生产者
[wisdom@rabbitmq188 web]$ cat > msg_send.py <<EOF
#!/usr/bin/env python
#-*- coding:utf-8 -*-

import sys
import pika

message = ' '.join(sys.argv[1:]) or "Hello World!"

# 建立与RabbitMQ建立连接
credentials = pika.PlainCredentials("test","test123")

conn_params = pika.ConnectionParameters("192.168.13.188",
5672,
"/test",
credentials = credentials)

conn_broker = pika.BlockingConnection(conn_params)

# 创建信道(生产者和消费者与RabbitMQ建立一条通道)
channel = conn_broker.channel()

# 创建队列(消息发送到队列)
channel.queue_declare(queue='queue_hello')

# 发布消息
channel.basic_publish(exchange="",
routing_key="queue_hello",
body=message)

print("[x] Sent %r" % (message,))

# 关闭与RabbitMQ建立连接
conn_broker.close()
EOF

//消费者(worker_1.py和worker_2 内容一致)
[wisdom@rabbitmq188 web]$ cat > worker_1.py <<EOF
#!/usr/bin/env python
#-*- coding:utf-8 -*-

import sys
import pika
import time

# 建立与RabbitMQ建立连接
credentials = pika.PlainCredentials("test","test123")

conn_params = pika.ConnectionParameters("192.168.13.188",
5672,
"/test",
credentials = credentials)

conn_broker = pika.BlockingConnection(conn_params)

# 创建信道(生产者和消费者与RabbitMQ建立一条通道)
channel = conn_broker.channel()

# 创建队列(消息发送到队列)
channel.queue_declare(queue='queue_hello')

# 创建消费消息
def callback(ch, method, properties, body):
print("[x] Received %r" % (body,))
time.sleep(body.count('.'))
print("[x] Done")

# 订阅消息
channel.basic_consume(callback,
queue='queue_hello',
no_ack=True)

print('[*] Waiting for messages. To exit press CTRL+C')

# 开始消费
channel.start_consuming()
EOF


//启动worker_1.py和worker_2.py
[wisdom@rabbitmq188 web]$ python worker_1.py
[*] Waiting for messages. To exit press CTRL+C
[wisdom@rabbitmq188 web]$ python worker_2.py
[*] Waiting for messages. To exit press CTRL+C

//执行 msg_send.py向队列写入数据
[wisdom@rabbitmq188 web]$ python msg_send.py First message.
[x] Sent 'First message.'
[wisdom@rabbitmq188 web]$ python msg_send.py Second message.
[x] Sent 'Second message.'
[wisdom@rabbitmq188 web]$ python msg_send.py Third message.
[x] Sent 'Third message.'
[wisdom@rabbitmq188 web]$ python msg_send.py Fourth message.
[x] Sent 'Fourth message.'
[wisdom@rabbitmq188 web]$ python msg_send.py Fifth message.
[x] Sent 'Fifth message.'

//worker_1.py返回状态
[wisdom@rabbitmq188 web]$ python worker_1.py
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'Second message.'
[x] Done
[x] Received 'Fourth message.'
[x] Done

//worker_2.py返回状态
[wisdom@rabbitmq188 web]$ python worker_2.py
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'First message.'
[x] Done
[x] Received 'Third message.'
[x] Done
[x] Received 'Fifth message.'
[x] Done

综上,每个工作者,都会依次分配到任务.如果一个工作者,在处理任务的时候挂掉,这个任务就没有完成,应当交由其他工作者处理.所以应当有一种机制,当一个工作者完成任务时,会反馈消息.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
消息确认就是当工作者完成任务后,会反馈给rabbitmq.修改worker.py中的回调函数:

[wisdom@rabbitmq188 web]$ cat > worker_1.py <<EOF
#!/usr/bin/env python
#-*- coding:utf-8 -*-

import sys
import pika
import time

# 建立与RabbitMQ建立连接
credentials = pika.PlainCredentials("test","test123")

conn_params = pika.ConnectionParameters("192.168.13.188",
5672,
"/test",
credentials = credentials)

conn_broker = pika.BlockingConnection(conn_params)

# 创建信道(生产者和消费者与RabbitMQ建立一条通道)
channel = conn_broker.channel()

# 创建队列(消息发送到队列)
channel.queue_declare(queue='queue_hello')

# 创建消费消息
def callback(ch, method, properties, body):
print("[x] Received %r" % (body,))
time.sleep(5)
print("[x] Done")
ch.basic_ack(delivery_tag = method.delivery_tag) #消息确认

# 订阅消息
channel.basic_consume(callback,
queue='queue_hello',
no_ack=False) # no_ack 关闭

print('[*] Waiting for messages. To exit press CTRL+C')

# 开始消费
channel.start_consuming()
EOF

//启动work_1.py和worker_2.py
[wisdom@rabbitmq188 web]$ python worker_1.py
[*] Waiting for messages. To exit press CTRL+C
[wisdom@rabbitmq188 web]$ python worker_2.py
[*] Waiting for messages. To exit press CTRL+C

//执行 msg_send.py向队列写入数据
[wisdom@rabbitmq188 web]$ python msg_send.py First message.
[x] Sent 'First message.'

//执行向队列发送消息后,我发现woker_1.py收到消息,我立即中断woker_1.py程序
[wisdom@rabbitmq188 web]$ python worker_1.py
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'First message.'
^CTraceback (most recent call last): "Ctrl+C 退出"

//在5秒中内停止worker_1.py后,worker_2.py接收消息并继续处理
[wisdom@rabbitmq188 web]$ python worker_2.py
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'First message.'
[x] Done

综上,通过消息确认,即使其中一个工作者ctrl+c退出后,正在执行的任务也不会丢失,rabbitmq会将任务重新分配给其他工作者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
虽然有了消息反馈机制,正在执行的任务不会丢失,但是如果rabbitmq本身服务挂掉的话,那么任务还是会丢失.所以需要将任务持久化存储起来.声明持久化存储:

//生产者
[wisdom@rabbitmq188 web]$ cat > msg_send.py <<EOF
#!/usr/bin/env python
#-*- coding:utf-8 -*-

import sys
import pika

message = ' '.join(sys.argv[1:]) or "Hello World!"

# 建立与RabbitMQ建立连接
credentials = pika.PlainCredentials("test","test123")

conn_params = pika.ConnectionParameters("192.168.13.188",
5672,
"/test",
credentials = credentials)

conn_broker = pika.BlockingConnection(conn_params)

# 创建信道(生产者和消费者与RabbitMQ建立一条通道)
channel = conn_broker.channel()

# 创建队列(消息发送到队列)
channel.queue_declare(queue='queue_hello',durable=True) #创建队列,声明持久化

# 发布消息
channel.basic_publish(exchange="",
routing_key="queue_hello",
body=message,
properties=pika.BasicProperties(
delivery_mode = 2, # 持久化参数
))

print("[x] Sent %r" % (message,))

# 关闭与RabbitMQ建立连接
conn_broker.close()
EOF

//消费者
[wisdom@rabbitmq188 web]$ cat > worker_1.py <<EOF
#!/usr/bin/env python
#-*- coding:utf-8 -*-

import sys
import pika
import time

# 建立与RabbitMQ建立连接
credentials = pika.PlainCredentials("test","test123")

conn_params = pika.ConnectionParameters("192.168.13.188",
5672,
"/test",
credentials = credentials)

conn_broker = pika.BlockingConnection(conn_params)

# 创建信道(生产者和消费者与RabbitMQ建立一条通道)
channel = conn_broker.channel()

# 创建队列(消息发送到队列)
channel.queue_declare(queue='queue_hello',durable=True) # 声明持久化队列

# 创建消费消息
def callback(ch, method, properties, body):
print("[x] Received %r" % (body,))
time.sleep(5)
print("[x] Done")
ch.basic_ack(delivery_tag = method.delivery_tag)

# 订阅消息
channel.basic_consume(callback,
queue='queue_hello',
no_ack=False)

print('[*] Waiting for messages. To exit press CTRL+C')

# 开始消费
channel.start_consuming()
EOF

注意:
声明队列的时候,需要添加"durable=True"参数,如果队列已经存在,程序执行会报错.rabbitmq不允许使用不同的参数来重新定义存在的队列,所以需要定义一个新的队列.
channel.queue_declare(queue='queue_hello',durable=True)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
公平调度:
上面实例中,虽然每个工作者是依次分配到任务,但是每个任务不一定一样.可能有的任务比较重,执行时间比较久;有的任务比较轻,执行时间比较短.如果能公平调度就最好了,使用basic_qos设置prefetch_count=1,使得rabbitmq不会在同一时间给工作者分配多个任务,即只有工作者完成任务之后,才会再次接收到任务.

//生产者
[wisdom@rabbitmq188 web]$ cat > msg_send.py <<EOF
#!/usr/bin/env python
#-*- coding:utf-8 -*-

import sys
import pika

message = ' '.join(sys.argv[1:]) or "Hello World!"

# 建立与RabbitMQ建立连接
credentials = pika.PlainCredentials("test","test123")

conn_params = pika.ConnectionParameters("192.168.13.188",
5672,
"/test",
credentials = credentials)

conn_broker = pika.BlockingConnection(conn_params)

# 创建信道(生产者和消费者与RabbitMQ建立一条通道)
channel = conn_broker.channel()

# 创建队列(消息发送到队列)
channel.queue_declare(queue='queue_hello',durable=True)

# 发布消息
channel.basic_publish(exchange="",
routing_key="queue_hello",
body=message,
properties=pika.BasicProperties(
delivery_mode = 2,
))

print("[x] Sent %r" % (message,))

# 关闭与RabbitMQ建立连接
conn_broker.close()
EOF

//消费者
[wisdom@rabbitmq188 web]$ cat > worker_1.py <<EOF
#!/usr/bin/env python
#-*- coding:utf-8 -*-

import sys
import pika
import time

# 建立与RabbitMQ建立连接
credentials = pika.PlainCredentials("test","test123")

conn_params = pika.ConnectionParameters("192.168.13.188",
5672,
"/test",
credentials = credentials)

conn_broker = pika.BlockingConnection(conn_params)

# 创建信道(生产者和消费者与RabbitMQ建立一条通道)
channel = conn_broker.channel()

# 创建队列(消息发送到队列)
channel.queue_declare(queue='queue_hello',durable=True)

# 创建消费消息
def callback(ch, method, properties, body):
print("[x] Received %r" % (body,))
time.sleep(5)
print("[x] Done")
ch.basic_ack(delivery_tag = method.delivery_tag)

# 公平调度
channel.basic_qos(prefetch_count=1) # 公平调度
# 订阅消息
channel.basic_consume(callback,
queue='queue_hello',
no_ack=False)

print('[*] Waiting for messages. To exit press CTRL+C')

# 开始消费
channel.start_consuming()
EOF

注意,公平调度主要是在消费者上进行设置:
channel.basic_qos(prefetch_count=1)
  1. RabbitMQ 交换器
    1
    2
    3
    工作队列,每次消息都只会发送给其中一个接收端,如果需要将消息广播出去,让每个接收端都能收到,那么就要使用交换机

    交换机的工作原理: 消息发送端先将消息发送给交换机,交换机再将消息发送到绑定的消息队列,而后每个接收端都能从各自的消息队列里接收到信息

RabbitMQ 工作队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
//生产者
[wisdom@rabbitmq188 web]$ cat > msg_send.py <<EOF
#!/usr/bin/env python
#-*- coding:utf-8 -*-

import sys
import pika

message = ' '.join(sys.argv[1:]) or "Hello World!"

# 建立与RabbitMQ建立连接
credentials = pika.PlainCredentials("test","test123")

conn_params = pika.ConnectionParameters("192.168.13.188",
5672,
"/test",
credentials = credentials)

conn_broker = pika.BlockingConnection(conn_params)

# 创建信道(生产者和消费者与RabbitMQ建立一条通道)
channel = conn_broker.channel()

# 创建交换机
channel.exchange_declare(exchange="exchange_hello",type='fanout') #创建交换机

# 发布消息
channel.basic_publish(exchange="exchange_hello", #发布消息到交换机
routing_key="",
body='Hello World!')

print("[x] Sent 'Hello World!'")

# 关闭与RabbitMQ建立连接
conn_broker.close()
EOF

注意:
1.生产者要将消息发送到交换机,而不是队列.
2.basic_publish方法的参数exchange被设定为相应交换机,因为是要广播出去,发送到所有队列,所以routing_key就不需要设定了
3.exchange如果为空,表示是使用匿名的交换机(例如:amq.*这样的交换机,就是系统默认的交换机).routing_key在使用匿名交换机的时候才需要指定,表示发送到哪个队列的意思


//消费者
[wisdom@rabbitmq188 web]$ cat > msg_receive.py <<EOF
#!/usr/bin/env python
#-*- coding:utf-8 -*-

import sys
import pika
import time

# 建立与RabbitMQ建立连接
credentials = pika.PlainCredentials("test","test123")

conn_params = pika.ConnectionParameters("192.168.13.188",
5672,
"/test",
credentials = credentials)

conn_broker = pika.BlockingConnection(conn_params)

# 创建信道(生产者和消费者与RabbitMQ建立一条通道)
channel = conn_broker.channel()

# 创建交换机
channel.exchange_declare(exchange='exchange_hello', type='fanout') # 创建交换机

# 创建随机队列,并绑定到交换机上
# "exclusive=True"表示当接收端退出时,销毁临时产生的队列
queue_random = channel.queue_declare(exclusive=True) # 创建随机队列,并绑定到交换机.
queue_name = queue_random.method.queue
channel.queue_bind(exchange='exchange_hello',queue=queue_name)

# 创建消费消息
def callback(ch, method, properties, body):
print("[x] Received %r" % (body,))

# 订阅消息
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)

print('[*] Waiting for messages. To exit press CTRL+C')

# 开始消费
channel.start_consuming()
EOF

  1. RabbitMQ 路由键

    1
    2
    3
    交换机已经能实现给所有接收端发送消息,但是如果需要自由定制,有的消息发给其中一些接收端,有些消息发送给另外一些接收端,这种情况就需要用到路由键了

    路由键的工作原理: 每个接收端的消息队列在绑定交换机的时候,可以设定相应的路由键.发送端通过交换机发送信息时,可以指明路由键,交换机会根据路由键把消息发送到相应的消息队列,这样接收端就能接收到消息了
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    //生产者
    [wisdom@rabbitmq188 web]$ cat > msg_send.py <<EOF
    #!/usr/bin/env python
    #-*- coding:utf-8 -*-

    import sys
    import pika

    # 建立与RabbitMQ建立连接
    credentials = pika.PlainCredentials("test","test123")

    conn_params = pika.ConnectionParameters("192.168.13.188",
    5672,
    "/test",
    credentials = credentials)

    conn_broker = pika.BlockingConnection(conn_params)

    # 创建信道(生产者和消费者与RabbitMQ建立一条通道)
    channel = conn_broker.channel()

    # 创建交换机
    channel.exchange_declare(exchange="exchange_hello",type='direct')

    # 定义三个路由键
    routings = ['info','warning','error']

    # 将消息发布到交换机,并设置路由键
    for routing in routings:
    message = '%s message.' % routing
    channel.basic_publish(exchange="exchange_hello",
    routing_key=routing,
    body=message)
    print(message)

    # 关闭与RabbitMQ建立连接
    conn_broker.close()
    EOF

    //消费者
    [wisdom@rabbitmq188 web]$ cat > msg_receive.py <<EOF
    #!/usr/bin/env python
    #-*- coding:utf-8 -*-

    import sys
    import pika
    import time

    # 建立与RabbitMQ建立连接
    credentials = pika.PlainCredentials("test","test123")

    conn_params = pika.ConnectionParameters("192.168.13.188",
    5672,
    "/test",
    credentials = credentials)

    conn_broker = pika.BlockingConnection(conn_params)

    # 创建信道(生产者和消费者与RabbitMQ建立一条通道)
    channel = conn_broker.channel()

    # 创建交换机
    channel.exchange_declare(exchange='exchange_hello', type='direct')

    # 从命令行获取路由键参数,如果没有,默认为info
    routings = sys.argv[1:]
    if not routings:
    routings = ['info']

    # 创建随机队列,并绑定到交换机上,设置路由键
    queue_random = channel.queue_declare(exclusive=True)
    queue_name = queue_random.method.queue
    for routing in routings:
    channel.queue_bind(exchange='exchange_hello',
    queue=queue_name,
    routing_key=routing)

    # 创建消费消息
    def callback(ch, method, properties, body):
    print("[x] Received %r" % (body,))

    # 订阅消息
    channel.basic_consume(callback,
    queue=queue_name,
    no_ack=True)

    print('[*] Waiting for messages. To exit press CTRL+C')

    # 开始消费
    channel.start_consuming()
    EOF

    //执行msg_receive.py
    [wisdom@rabbitmq188 web]$ python msg_receive.py warning error
    [*] Waiting for messages. To exit press CTRL+C
  2. RabbitMQ 路由键模糊匹配

    1
    2
    通过设置路由键,可以将消息发送到相应的队列,这里的路由键是要完全匹配,比如: info消息的只能发到路由键为info的消息队列
    路由键模糊匹配,就是可以使用正则表达式.和常用的正则表示式不同,这里的话"#"表示所有,全部的意思;"*"只匹配到一个词
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
//生产者
[wisdom@rabbitmq188 web]$ cat > msg_send.py <<EOF
#!/usr/bin/env python
#-*- coding:utf-8 -*-

import sys
import pika

message = ' '.join(sys.argv[1:]) or "Hello World!"

# 建立与RabbitMQ建立连接
credentials = pika.PlainCredentials("test","test123")

conn_params = pika.ConnectionParameters("192.168.13.188",
5672,
"/test",
credentials = credentials)

conn_broker = pika.BlockingConnection(conn_params)

# 创建信道(生产者和消费者与RabbitMQ建立一条通道)
channel = conn_broker.channel()

# 创建交换机
channel.exchange_declare(exchange="exchange_hello",type='topic') # 设置为topic类型

# 定义三个路由键
routings = ['happy.work','happy.life','sad.work','sad.life']

# 将消息发布到交换机,并设置路由键
for routing in routings:
message = '%s message.' % routing
channel.basic_publish(exchange="exchange_hello",
routing_key=routing,
body=message)
print(message)

# 关闭与RabbitMQ建立连接
conn_broker.close()
EOF

//消费者
[wisdom@rabbitmq188 web]$ cat > msg_receive.py <<EOF
#!/usr/bin/env python
#-*- coding:utf-8 -*-

import sys
import pika
import time

# 建立与RabbitMQ建立连接
credentials = pika.PlainCredentials("test","test123")

conn_params = pika.ConnectionParameters("192.168.13.188",
5672,
"/test",
credentials = credentials)

conn_broker = pika.BlockingConnection(conn_params)

# 创建信道(生产者和消费者与RabbitMQ建立一条通道)
channel = conn_broker.channel()

# 创建交换机
channel.exchange_declare(exchange='exchange_hello', type='topic')

# 从命令行获取路由键参数,如果没有,则报错退出
routings = sys.argv[1:]
if not routings:
print >> sys.stderr,"Usage: %s [routing_key]..." % (sys.argv[0])
sys.exit()

# 创建随机队列,并绑定到交换机上,设置路由键
queue_random = channel.queue_declare(exclusive=True)
queue_name = queue_random.method.queue
for routing in routings:
channel.queue_bind(exchange='exchange_hello',
queue=queue_name,
routing_key=routing)

# 创建消费消息
def callback(ch, method, properties, body):
print("[x] Received %r" % (body,))

# 订阅消息
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)

print('[*] Waiting for messages. To exit press CTRL+C')

# 开始消费
channel.start_consuming()
EOF

//运行接收端
[wisdom@rabbitmq188 web]$ python msg_receive.py "*.work"
[wisdom@rabbitmq188 web]$ python msg_receive.py "happy.*"


补充说明:
1.发送信息时,如果不设置路由键,那么路由键设置为"*"的接收端是否能接收到消息?
发送信息时,如果不设置路由键,默认是表示广播出去,理论上所有接收端都可以收到消息,但经测试,路由键设置为"*"的接收端收不到任何消息.只有发送消息时,设置路由键为一个词,路由键设置为"*"的接收端才能收到消息.在这里,每个词使用"."符号分开的

2.发送消息时,如果路由键设置为"..",那么路由键设置为"#."的接收端是否能接收到消息?如果发送消息时,路由键设置为一个词呢?
两种情况,测试都可以

3."a.*.#""a.#"的区别
"a.#"只要字符串开头的一个词是a就可以了,比如a,a.haha,a.haha.haha;而这样的词是不行的,如abs,abc,abc.haha
"a.*.#"必须要满足a.*的字符串才可以,比如a.,a.haha,a.haha.haha.而这样的词是不行的,如a
  1. RabbitMQ 远程结果返回

    1
    2
    3
    前面的例子都有个共同点,就是发送端发送消息出去后没有结果返回.如果只是单纯发送消息,当然没有问题了.但是在实际中,常常会需要接收端将收到的消息进行处理之后,返回给发送端.

    处理方法描述: 发送端在发送信息前,产生一个接收消息的临时队列,该队列用来接收返回的结果.其实在这里接收端,发送端的概念已经比较模糊了,因为发送端也同样要接收消息,接收端同样也要发送消息
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    示例内容: 假设有一个控制中心和一个计算节点,控制中心会将一个自然数N发送给计算节点,计算节点将N值加1后,返回给控制中心.这里用center.py模拟控制中心,compute.py模拟计算节点


    //生产者
    [wisdom@rabbitmq188 web]$ cat > msg_send.py <<EOF
    #!/usr/bin/env python
    #-*- coding:utf-8 -*-

    import sys
    import pika


    class Center(object):
    def __init__(self):
    # 建立与RabbitMQ建立连接
    self.credentials = pika.PlainCredentials("test","test123")
    self.conn_params = pika.ConnectionParameters("192.168.13.188",
    5672,
    "/test",
    credentials = self.credentials)
    self.conn_broker = pika.BlockingConnection(self.conn_params)

    # 创建信道(生产者和消费者与RabbitMQ建立一条通道)
    self.channel = self.conn_broker.channel()

    # 定义接收到返回消息处理方法
    def on_response(self,ch,method,props,body):
    self.response = body

    def request(self,n):
    self.response = None
    #发送计算请求,并声明返回队列
    self.channel.basic_publish(exchange='',
    routing_key='queue_compute',
    properties=pika.BasicProperties(
    reply_to = self.callback_queue,
    ),
    body=str(n))
    #接收返回的数据
    while self.response is None:
    self.conn_broker.process_data_events()
    return int(self.response)

    center = Center()

    print " [x] Requesting increase(30)"
    response = center.request(30)
    print " [.] Got %r" % (response,)
    EOF

    //消费者
    [wisdom@rabbitmq188 web]$ cat > msg_receive.py <<EOF
    #!/usr/bin/env python
    #-*- coding:utf-8 -*-

    import sys
    import pika
    import time

    # 建立与RabbitMQ建立连接
    credentials = pika.PlainCredentials("test","test123")

    conn_params = pika.ConnectionParameters("192.168.13.188",
    5672,
    "/test",
    credentials = credentials)

    conn_broker = pika.BlockingConnection(conn_params)

    # 创建信道(生产者和消费者与RabbitMQ建立一条通道)
    channel = conn_broker.channel()

    # 创建队列
    channel.queue_declare(queue='queue_compute')
    print('[*] Waiting for n')

    # 将n值加1
    def increase(n):
    return n + 1

    # 创建消费消息
    def callback(ch, method, properties, body):
    print("[.] increase(%s)" % (body,))
    response = increase(int(body))
    # 将计算结果返回生产者
    ch.basic_publish(exchange='',
    routing_key=properties.reply_to,
    body=str(response))
    ch.basic_ack(delivery_tag=method.delivery_tag)

    channel.basic_qos(prefetch_count=1)

    # 订阅消息
    channel.basic_consume(callback,
    queue='queue_compute',
    no_ack=False)

    print('[*] Waiting for messages. To exit press CTRL+C')

    # 开始消费
    channel.start_consuming()
    EOF
  2. RabbitMQ 相互关联编号”correlation id”

    1
    2
    3
    4
    5
    远程结果中有一个没有提到,就是"correlation id",这是什么呢?

    假设有多个计算节点,控制中心开启多个线程,往这些计算节点发送数字,要求计算结果并返回,但是控制中心只开启了一个队列,所有线程都是从这个队列里获取消息,每个线程如何确定收到的消息就是该线程对应的呢?这个就是"correlation id"的用处了.correlation翻译成中文就是相互关联,也表达了这个意思

    "correlation id"运行原理: 控制中心发送计算请求时设置"correlation id",而后计算节点将计算结果,连同接收到的"correlation id"一起返回,这样控制中心就能通过"correlation id"来标识请求.其实"correlation id"也可以理解为请求的唯一标识码
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    示例内容: 控制中心开启多个线程,每个线程都发起一次计算请求,通过"correlation id",每个线程都能准确收到相应的计算结果

    //生产者
    [wisdom@rabbitmq188 web]$ cat > msg_send.py <<EOF
    #!/usr/bin/env python
    #-*- coding:utf-8 -*-

    import sys
    import pika
    import threading
    import uuid

    # 自定义线程类,继承threading.Thread
    class MyThread(threading.Thread):
    def __init__(self):
    super(MyThread,self).__init()
    self.func = func
    self.num = num
    def run(self):
    print("[x] Requesting increase(%d)" % self.num)
    response = self.func(self.num)
    print "[.] increase(%d)=%d" % (self.num, response)

    # 控制中心类
    class Center(object):
    def __init__(self):
    # 建立与RabbitMQ建立连接
    self.credentials = pika.PlainCredentials("test","test123")
    self.conn_params = pika.ConnectionParameters("192.168.13.188",
    5672,
    "/test",
    credentials = self.credentials)
    self.conn_broker = pika.BlockingConnection(self.conn_params)

    # 创建信道(生产者和消费者与RabbitMQ建立一条通道)
    self.channel = self.conn_broker.channel()

    # 定义接收返回消息队列
    result = self.channel.queue_declare(exclusive=True)
    self.callback_queue = result.method.queue
    self.channel.basic_consume(self.on_response,
    no_ack=True,
    queue=self.callback_queue)
    #返回的结果都会存储在该字典里
    self.response = {}

    # 定义接收到返回消息处理方法
    def on_response(self,ch,method,props,body):
    self.response[props.correlation_id] = body

    def request(self,n):
    corr_id = str(uuid.uuid4())
    self.response[corr_id] = None
    #发送计算请求,并声明返回队列
    self.channel.basic_publish(exchange='',
    routing_key='queue_compute',
    properties=pika.BasicProperties(
    reply_to = self.callback_queue,
    correlation_id = corr_id,
    ),
    body=str(n))
    #接收返回的数据
    while self.response is None:
    self.conn_broker.process_data_events()
    return int(self.response[corr_id])

    center = Center()
    #发起5次计算请求
    nums= [10, 20, 30, 40 ,50]
    threads = []
    for num in nums:
    threads.append(MyThread(center.request, num))
    for thread in threads:
    thread.start()
    for thread in threads:
    thread.join()
    EOF

    //消费者
    [wisdom@rabbitmq188 web]$ cat > msg_receive.py <<EOF
    #!/usr/bin/env python
    #-*- coding:utf-8 -*-

    import sys
    import pika
    import time

    # 建立与RabbitMQ建立连接
    credentials = pika.PlainCredentials("test","test123")

    conn_params = pika.ConnectionParameters("192.168.13.188",
    5672,
    "/test",
    credentials = credentials)

    conn_broker = pika.BlockingConnection(conn_params)

    # 创建信道(生产者和消费者与RabbitMQ建立一条通道)
    channel = conn_broker.channel()

    # 创建队列
    channel.queue_declare(queue='queue_compute')
    print('[*] Waiting for n')

    # 将n值加1
    def increase(n):
    return n + 1

    # 创建消费消息
    def callback(ch, method, properties, body):
    print("[.] increase(%s)" % (body,))
    response = increase(int(body))
    # 将计算结果返回生产者,增加"correlation_id"的设定
    ch.basic_publish(exchange='',
    routing_key=properties.reply_to,
    properties=pika.BasicProperties(correlation_id= \
    props.correlation_id),
    body=str(response))
    ch.basic_ack(delivery_tag=method.delivery_tag)

    channel.basic_qos(prefetch_count=1)

    # 订阅消息
    channel.basic_consume(callback,
    queue='queue_compute',
    no_ack=False)

    print('[*] Waiting for messages. To exit press CTRL+C')

    # 开始消费
    channel.start_consuming()
    EOF

command tr

  1. tr 将多个空格替换为1个空格

    1
    2
    3
    4
    5
    6
    [root@dev ~]# cat b
    a b c
    1 2 3 5
    [root@dev ~]# cat b|tr -s ' '
    a b c
    1 2 3 5
  2. tr替换

    1
    2
    3
    4
    5
    6
    [root@dev ~]# cat b|tr -s ' '
    a b c
    1 2 3 5
    [root@dev ~]# cat b|tr -s ' '|tr ' ' ','
    a,b,c
    1,2,3,5