NSQ-分布式实时消息中间件

消息中间件

  1. 概念:它的另一个名字叫做消息队列,主要用来高效可靠地传递消息。
  2. 使用场景:

    • 解耦

解耦0.jpg

 A系统发送个数据到BCD三个系统,接口调用发送,那如果E系统也要这个数据呢?那如果C系统现在不需要了呢?现在A系统又要发送第二种数据了呢?A系统负责人濒临崩溃中。。。再来点更加崩溃的事儿,A系统要时时刻刻考虑BCDE四个系统如果挂了咋办?我要不要重发?我要不要把消息存起来?头发都白了啊。。。

解耦1.jpg

  • 异步:

异步0.jpg
异步1.jpg

 A系统接收一个请求,需要在自己本地写库,还需要在BCD三个系统写库,自己本地写库要3ms,BCD三个系统分别写库要300ms、450ms、200ms。最终请求总延时是3 + 300 + 450 + 200 = 953ms,接近1s,用户感觉搞个什么东西,慢死了慢死了。

 更改为 异步后当消息发送到消息队列  自行让对应系统进行消费即可  所以给用户的体验为20 + 5 = 25ms  ,快 好快!
  • 削峰:每天0点到11点,A系统风平浪静,每秒并发请求数量就100个。结果每次一到11点~1点,每秒并发请求数量突然会暴增到1万条。但是系统最大的处理能力就只能是每秒钟处理1000个请求啊。。。尴尬了,系统会死。。。

削峰0.jpg
削峰1.jpg

  1. 消息中间件模式分类

    • 点对点

消息中间件点对点模式.png

 说明: 
 消息生产者生产消息发送到queue中,然后消息消费者从queue中取出并且消费消息。 
 消息被消费以后,queue中不再存储,所以消息消费者不可能消费到已经被消费的消息。 Queue支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费。
  • 发布/订阅

消息中间件发布-订阅模式.png

 说明: 
 消息生产者(发布)将消息发布到topic中,同时有多个消息消费者(订阅)消费该消息。和点对点方式不同,发布到topic的消息会被所有订阅者消费。

 queue实现了负载均衡,将producer生产的消息发送到消息队列中,由多个消费者消费。但一个消息只能被一个消费者接受,当没有消费者可用时,这个消息会被保存直到有一个可用的消费者。 

 topic实现了发布和订阅,当你发布一个消息,所有订阅这个topic的服务都能得到这个消息,所以从1到N个订阅者都能得到一个消息的拷贝。
  1. 常见消息中间件

    • Kafka:Apache子项目
    • ActiveMQ:Apache项目
    • RabbitMQ:Erlang开发
    • RocketMQ: 产自阿里
    • nsq:golang开发

NSQ

组件构成
  1. nsqd

nsqd 是一个守护进程,负责接收,排队,投递消息给客户端。
它可以独立运行,不过通常它是由 nsqlookupd 实例所在集群配置的(它在这能声明 topics 和 channels,以便大家能找到)。

    • 服务启动后有两个端口:一个给客户端,另一个是 HTTP API。还能够开启HTTPS。
    • 同一台服务器启动多个nsqd,要注意端口和数据路径必须不同,包括:–lookupd-tcp-address、 -tcp-address、–data-path。
    • 删除topic、channel需要http api调用。
    1. nsqlookupd

    nsqlookupd 是守护进程,负责管理拓扑信息并提供最终一致性的发现服务。客户端通过查询 nsqlookupd 来发现指定话题(topic)的生产者,并且 nsqd 节点广播话题(topic)和通道(channel)信息。

    • 该服务运行后有两个端口:TCP 接口,nsqd 用它来广播;HTTP 接口,客户端用它来发现和管理。
    • 在生产环境中,为了高可用,最好部署三个nsqlookupd服务。
    1. nsqadmin

    nsqadmin 是一套 WEB UI,用来汇集集群的实时统计,并执行不同的管理任务。
    运行后,能够通过4171端口查看并管理topic和channel。

    • 通常只需要运行一个。
    1. utilities

    常见基础功能、数据流处理工具,如nsq_stat、nsq_tail、nsq_to_file、nsq_to_http、nsq_to_nsq、to_nsq

    NSQ拓扑图

    nsq拓扑图.png

    • nsqlookupd进程同时开启tcp和http两个监听服务,TCP监听是用于NSQD进程连接,http监听是用于提供给nsqadmin获取集群信息
    • nsqadmin进程只开启http服务,其实就是一个web服务,提供给客户端查询集群信息
    • nsqd进程同时开启tcp和http服务,TPC监听和http监听都提供给生产者和消费者连接,http服务还提供给nsqadmin获取该nsqd本地信息
    • nsqd连接到nsqlookupd的tcp监听上,通过心跳告诉nsqlookupd自己在线
    • writer是生产者,直接连接nsqd
    • reader是消费者,直接连接nsqd
    消息拉取过程详解

    topic-channel.gif

    • 每个topic后边可以有一个或多个channel
    • 当消息发送至nsqd中对应的topic中后,nsqd会把这个消息复制到每个相连的channel中去
    • 每个channel后边会有一个或者多个消费者
    • channel中的消息是流式的,即每个消息只能被一个消费者消费
    nsq消息流动过程

    nsq拓扑图新.png

    1. 程序A (发布者)发布一条消息到某个 topic 中去(消息存储在本地的一个 nsqd A 中;第一次发布某个topic下的消息的时候回自动创建这个topic)
    2. 程序B (订阅者)到nsqlookupd结点上查询某个topic和channel的结点信息(此时查询到了 nsqd A 的信息)
    3. 程序B 直接从 nsqd A 上拉取消息进行处理
    nsq的部署
    1. 官网下载对应版本的nsq压缩包(https://nsq.io/deployment/installing.html)
    2. 解压文件,并把解压后的文件夹中的bin文件夹添加进环境变量
    3. 输入which nsqd命令查看环境变量配置是否成功(如果能显示nsqd的路径的话说明环境变量配置成功了)
    nsq的启动
    #以守护进程的方式启动nsqlookupd
    nohup nsqlookupd>/dev/null 2>&1 &
    #以守护进程的方式启动nsqd
    nohup nsqd --lookupd-tcp-address=127.0.0.1:4160>/dev/null 2>&1 &
    #以守护进程的方式启动nsqadmin
    nohup nsqadmin --lookupd-http-address=127.0.0.1:4161>/dev/null 2>&1 &
    利用nsq_to_file测试nsq
    #借助curl工具发布消息(同时第一次发布消息会对应的创建一个topic)
    curl -d 'hello world 1' 'http://127.0.0.1:4151/pub?topic=test'
    curl -d 'hello world 2' 'http://127.0.0.1:4151/pub?topic=test'
    curl -d 'hello world 3' 'http://127.0.0.1:4151/pub?topic=test'
    #借助nsq_to_file工具消费消息(创建channel)
    nsq_to_file --topic=test --output-dir=/tmp --lookupd-http-address=127.0.0.1:4161
    #查看/tmp目录下的日志(test.*.log)
    web端查看nsq系统的信息

    打开任意浏览器,输入http://127.0.0.1:4171/查看统计信息

    go-nsq客户端demo

    生产者:

    package main
    
    import (
        "fmt"
        "github.com/nsqio/go-nsq"
    )
    
    func main() {
        fmt.Println("This is a publisher")
    
        //创建生产者
        nsqd := "127.0.0.1:4150"
        producer, err := nsq.NewProducer(nsqd, nsq.NewConfig())
        if err != nil {
            fmt.Println("new producer err:", err)
        }
        //发布消息
        producer.Publish("go_lib_test", []byte("nihao"))
        if err != nil {
            panic(err)
        }
    }

    消费者:

    package main
    
    import (
        "github.com/nsqio/go-nsq"
        "fmt"
    )
    
    func main(){
        //创建消费者
        c, err := nsq.NewConsumer("go_lib_test", "go_lib", nsq.NewConfig())
        if err != nil{
            fmt.Println("new consumer err:", err)
            return
        }
    
        //添加用于处理消息的handler
        c.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error {
            //具体的消息处理逻辑,写在下方
            fmt.Println("got a message:", string(message.Body))
            return nil
        }))
    
        //连接nsqlookupd
        err = c.ConnectToNSQLookupd("127.0.0.1:4161")
        if err != nil{
            fmt.Println("connect to nsqlookupd err:", err)
            return
        }
    
        //避免主程序退出
        select{}
    
    }

    参考文章:

    最后修改:2021 年 02 月 19 日 10 : 34 PM
    如果觉得我的文章对你有用,请随意赞赏