BUILDING CLIENT LIBRARIES 编译客户端库

译文

原文链接: https://nsq.io/clients/building_client_libraries.html

NSQ’s design pushes a lot of responsibility onto client libraries in order to maintain overall cluster robustness and performance.

NSQ的设计将很多责任推给了客户端库,以维护总体集群的健壮性和性能。

This guide attempts to outline the various responsibilities well-behaved client libraries need to fulfill. Because publishing to nsqd is trivial (just an HTTP POST to the /put endpoint), this document focuses on consumers.

本指南试图概述行为良好的客户端库需要履行的各种职责。因为发布到“nsqd”非常简单(只是到“/put”端点的HTTP POST),所以本文主要关注消费者。

By setting these expectations we hope to provide a foundation for achieving consistency across languages for NSQ users.

通过设置这些期望,我们希望为NSQ用户实现跨语言的一致性提供一个基础。

Overview 概述

  1. Configuration
  2. Discovery (optional)
  3. Connection Handling
  4. Feature Negotiation
  5. Data Flow / Heartbeats
  6. Message Handling
  7. RDY State
  8. Backoff
  9. Encryption/Compression

Configuration 配置

At a high level, our philosophy with respect to configuration is to design the system to have the flexibility to support different workloads, use sane defaults that run well “out of the box”, and minimize the number of dials.

在高层次上,我们关于配置的理念是设计系统,使其具有支持不同工作负载的灵活性,使用运行良好的正常默认值(内存不足),尽量减少拨号次数。

A consumer subscribes to a topic on a channel over a TCP connection to nsqd instance(s). You can only subscribe to one topic per connection so multiple topic consumption needs to be structured accordingly.

使用者通过到nsqd实例的TCP连接订阅通道上的主题。每个连接只能订阅一个主题,因此需要相应地组织多个主题的使用。

Using nsqlookupd for discovery is optional so client libraries should support a configuration where a consumer connects directly to one or more nsqd instances or where it is configured to poll one or more nsqlookupdinstances. When a consumer is configured to poll nsqlookupd the polling interval should be configurable. Additionally, because typical deployments of NSQ are in distributed environments with many producers and consumers, the client library should automatically add jitter based on a random % of the configured value. This will help avoid a thundering herd of connections. For more detail see Discovery.

使用nsqlookupd进行发现是可选的,因此客户端库应该支持这样的配置:使用者直接连接到一个或多个nsqupd实例,或者将其配置为轮询一个或多个nsqlookupd实例。当用户被配置为轮询nsqlookupd时,轮询间隔应该是可配置的。此外,由于NSQ的典型部署是在有许多生产者和消费者的分布式环境中,客户端库应该根据配置值的随机%自动添加抖动。

An important performance knob for consumers is the number of messages it can receive before nsqd expects a response. This pipelining facilitates buffered, batched, and asynchronous message handling. By convention this value is called max_in_flight and it effects how RDY state is managed. For more detail see RDY State.

对于消费者来说,一个重要的性能旋钮是它在“nsqd”预期响应之前可以接收到的消息数量。这种管道简化了缓冲、批处理和异步消息处理。按照惯例,这个值称为' max_in_flight ',它影响' RDY '状态的管理方式。有关详细信息,请参见RDY状态。

Being a system that is designed to gracefully handle failure, client libraries are expected to implement retry handling for failed messages and provide options for bounding that behavior in terms of number of attempts per message. For more detail see Message Handling.

作为一个设计用来优雅地处理失败的系统,客户端库应该实现对失败消息的重试处理,并根据每个消息的尝试次数提供约束该行为的选项。有关详细信息,请参见消息处理

Relatedly, when message processing fails, the client library is expected to automatically handle re-queueing the message. NSQ supports sending a delay along with the REQ command. Client libraries are expected to provide options for what this delay should be set to initially (for the first failure) and how it should change for subsequent failures. For more detail see Backoff.

与此相关,当消息处理失败时,客户端库将自动处理重新排队的消息。NSQ支持发送延迟和REQ 命令。客户端库应该提供选项,以确定最初应该将延迟设置为什么(对于第一次故障),以及在后续故障时应该如何更改延迟。有关详细信息,请参阅Backoff

Most importantly, the client library should support some method of configuring callback handlers for message processing. The signature of these callbacks should be simple, typically accepting a single parameter (an instance of a “message object”).

最重要的是,客户端库应该支持一些方法来为消息处理配置回调处理程序。这些回调的签名应该是简单的,通常接受一个参数(一个“消息对象”)的实例。

Discovery 发现

An important component of NSQ is nsqlookupd, which provides a discovery service for consumers to locate the nsqd that provide a given topic at runtime.

NSQ的一个重要组件是nsqlookupd ,它为使用者提供了一个发现服务,以定位在运行时提供给定主题的 nsqd

Although optional, using nsqlookupd greatly reduces the amount of configuration required to maintain and scale a large distributed NSQ cluster.

虽然是可选的,但是使用nsqlookupd大大减少了维护和扩展大型分布式NSQ集群所需的配置量。

When a consumer uses nsqlookupd for discovery, the client library should manage the process of polling all nsqlookupd instances for an up-to-date set of nsqd providing the topic in question, and should manage the connections to those nsqd.

当使用者使用nsqlookupd进行发现时,客户端库应该管理轮询所有nsqlookupd实例的过程,以获得一组提供相关主题的最新nsqd实例,并应该管理到这些nsqd的连接。

Querying an nsqlookupd instance is straightforward. Perform an HTTP request to the lookup endpoint with a query parameter of the topic the consumer is attempting to discover (i.e. /lookup?topic=clicks).

查询nsqlookupd实例非常简单。使用使用者试图发现的主题的查询参数(即/lookup?topic=clicks)。

The response format is JSON:

响应格式为json

{
    "channels": ["archive", "science", "metrics"],
    "producers": [
        {
            "broadcast_address": "clicksapi01.routable.domain.net",
            "hostname": "clicksapi01.domain.net",
            "remote_address": "172.31.27.114:51996",
            "tcp_port": 4150,
            "http_port": 4151,
            "version": "1.0.0-compat"
        },
        {
            "broadcast_address": "clicksapi02.routable.domain.net",
            "hostname": "clicksapi02.domain.net",
            "remote_address": "172.31.34.29:14340",
            "tcp_port": 4150,
            "http_port": 4151,
            "version": "1.0.0-compat"
        }
    ]
}

The broadcast_address and tcp_port should be used to connect to an nsqd. Because, by design, nsqlookupdinstances don’t share or coordinate their data, the client library should union the lists it received from all nsqlookupd queries to build the final list of nsqd to connect to. The broadcast_address:tcp_port combination should be used as the unique key for this union.

应该使用broadcast_address tcp_port 连接到 nsqd 。因为按照设计,nsqlookupd 实例不共享或协调它们的数据,客户端库应该将它从所有 nsqlookupd 查询中接收到的列表联合起来,以构建要连接到的nsqd 的最终列表。

A periodic timer should be used to repeatedly poll the configured nsqlookupd so that consumers will automatically discover new nsqd. The client library should automatically initiate connections to all newly discovered instances.

应该使用定期计时器来重复轮询配置的nsqlookupd,以便用户自动发现新的nsqd。客户机库应该自动启动到所有新发现实例的连接。

When client library execution begins it should bootstrap this polling process by kicking off an initial set of requests to the configured nsqlookupd instances.

当客户端库执行开始时,它应该启动对已配置的nsqlookupd实例的初始请求集,从而引导这个轮询过程。

Connection Handling 链接处理

Once a consumer has an nsqd to connect to (via discovery or manual configuration), it should open a TCP connection to broadcast_address:port. A separate TCP connection should be made to each nsqd for each topic the consumer wants to subscribe to.

一旦使用者有一个nsqd要连接(通过发现或手动配置),它应该打开到broadcast_address:port的TCP连接。对于消费者希望订阅的每个主题,应该为每个“nsqd”建立单独的TCP连接。

When connecting to an nsqd instance, the client library should send the following data, in order:

当连接到nsqd实例时,客户端库应发送以下数据,以便:

  1. the magic identifier

编码标识符号

  1. an IDENTIFY command (and payload) and read/verify response (see Feature Negotiation)

一个' IDENTIFY '命令(和有效负载)和read/verify响应(参见Feature Negotiation)

  1. a SUB command (specifying desired topic) and read/verify response

一个 SUB 命令(指定所需的主题)并读取/验证响应

  1. an initial RDY count of 1 (see RDY State).

初始' RDY '计数为1(参见RDY State)。

(low-level details on the protocol are available in the spec)

协议的底层细节可以在[spec]中找到

Reconnection 重新连接

Client libraries should automatically handle reconnection as follows:

客户端库应自动处理重连接如下:

  • If the consumer is configured with a specific list of nsqd instances, reconnection should be handled by delaying the retry attempt in an exponential backoff manner (i.e. try to reconnect in 8s, 16s, 32s, etc., up to a max).

如果使用者配置了一个特定的nsqd实例列表,则应该以指数回退的方式延迟重试尝试(即尝试在8秒、16秒、32秒等最大时间内重新连接)来处理重连接。

  • If the consumer is configured to discover instances via nsqlookupd, reconnection should be handled automatically based on the polling interval (i.e. if a consumer disconnects from an nsqd, the client library should only attempt to reconnect if that instance is discovered by a subsequent nsqlookupd polling round). This ensures that consumers can learn about nsqd that are introduced to the topology and ones that are removed (or failed).

如果使用者被配置为通过' nsqlookupd '发现实例,则应根据轮询间隔自动处理重连接(即,如果使用者断开与' nsqd '的连接,则只有在随后的' nsqlookupd '轮询发现该实例时,客户端库才应尝试重新连接)。这确保用户可以了解引入到拓扑中的“nsqd”和删除(或失败)的“nsqd”。Feature Negotiation 会话参数协商

The IDENTIFY command can be used to set nsqd side metadata, modify client settings, and negotiate features. It satisfies two needs:

IDENTIFY命令可用于设置nsqd端元数据、修改客户端设置和协商功能。它满足两个需求:

  1. In certain cases a client would like to modify how nsqd interacts with it (such as modifying a client’s heartbeat interval and enabling compression, TLS, output buffering, etc. - for a complete list see the spec)

在某些情况下,客户端希望修改“nsqd”与它的交互方式(例如修改客户端心跳间隔并启用压缩、TLS、输出缓冲等——完整列表请参见[spec]

  1. nsqd responds to the IDENTIFY command with a JSON payload that includes important server side configuration values that the client should respect while interacting with the instance.

nsqd 使用JSON有效负载响应 IDENTIFY 命令,该有效负载包含客户端在与实例交互时应该尊重的重要服务器端配置值。

After connecting, based on the user’s configuration, a client library should send an IDENTIFY command, the body of which is a JSON payload:

连接后,根据用户的配置,客户端库应该发送一个IDENTIFY命令,其主体是JSON有效负载:

{
    "client_id": "metrics_increment",
    "hostname": "app01.bitly.net",
    "heartbeat_interval": 30000,
    "feature_negotiation": true
}

The feature_negotiation field indicates that the client can accept a JSON payload in return. The client_id and hostname are arbitrary text fields that are used by nsqd (and nsqadmin) to identify clients. heartbeat_intervalconfigures the interval between heartbeats on a per-client basis.

feature_negotiation 字段表示客户端可以接受JSON有效负载作为回报。client_idhostname是由nsqd(和nsqadmin)用来标识客户机的任意文本字段。 heartbeat_interval 根据每个客户端配置心跳之间的间隔。

The nsqd will respond OK if it does not support feature negotiation (introduced in nsqd v0.2.20+), otherwise:

如果nsqd不支持特性协商(在nsqd v0.2.20+中引入),则nsqd将响应OK,否则:

{
    "max_rdy_count": 2500,
    "version": "0.2.20-alpha"
}

More detail on the use of the max_rdy_count field is in the RDY State section.

有关 max_rdy_count字段的使用的更多细节,请参见RDY State部分

Data Flow and Heartbeats 数据流和心跳

Once a consumer is in a subscribed state, data flow in the NSQ protocol is asynchronous. For consumers, this means that in order to build truly robust and performant client libraries they should be structured using asynchronous network IO loops and/or “threads” (the scare quotes are used to represent both OS-level threads and userland threads, like coroutines).

一旦消费者处于订阅状态,NSQ协议中的数据流就是异步的。对于消费者来说,这意味着为了构建真正健壮和高性能的客户端库,应该使用异步网络IO循环和/或“线程”来构造它们(这个双引号是用在表示操作系统级线程和用户级线程,如协程)。

Additionally clients are expected to respond to periodic heartbeats from the nsqd instances they’re connected to. By default this happens at 30 second intervals. The client can respond with any command but, by convention, it’s easiest to simply respond with a NOP whenever a heartbeat is received. See the protocol spec for specifics on how to identify heartbeats.

此外,客户端还需要响应连接到的nsqd实例的周期性心跳。默认情况下,这每隔30秒发生一次。客户端可以用任何命令响应,但是按照惯例,只要接收到心跳,就简单地用“NOP”响应是最简单的。有关如何识别心跳的详细信息,请参阅协议规范

A “thread” should be dedicated to reading data off the TCP socket, unpacking the data from the frame, and performing the multiplexing logic to route the data as appropriate. This is also conveniently the best spot to handle heartbeats. At the lowest level, reading the protocol involves the following sequential steps:

“线程”应该专门用于从TCP套接字读取数据,从帧中解包数据,并执行多路复用逻辑来路由数据。这也是处理心跳的最佳地点。在最低级别,阅读协议包括以下顺序步骤:

  1. read 4 byte big endian uint32 size

读取四个字节 大端存储的 无符号32整形 大小

  1. read size bytes data

读取大小字节数据

  1. unpack data

解压缩数据

业务操作

  1. profit

收益

  1. goto 1

跳转到 第一步

A Brief Interlude on Errors 错误的一个插曲

Due to their asynchronous nature, it would take a bit of extra state tracking in order to correlate protocol errors with the commands that generated them. Instead, we took the “fail fast” approach so the overwhelming majority of protocol-level error handling is fatal. This means that if the client sends an invalid command (or gets itself into an invalid state) the nsqd instance it’s connected to will protect itself (and the system) by forcibly closing the connection (and, if possible, sending an error to the client). This, coupled with the connection handling mentioned above, makes for a more robust and stable system.

由于它们是异步的,因此需要进行一些额外的状态跟踪,以便将协议错误与生成错误的命令关联起来。相反,我们采用了“快速失败”方法,因此绝大多数协议级错误处理都是致命的。这意味着,如果客户机发送了一个无效命令(或使自己进入无效状态),那么它所连接的' nsqd '实例将通过强制关闭连接(如果可能的话,还会向客户机发送错误)来保护自己(和系统)。这一点,加上上面提到的连接处理,使得系统更加健壮和稳定。

The only errors that are not fatal are:

唯一不致命的错误是:

  • E_FIN_FAILED - a FIN command for an invalid message ID

E_FIN_FAILED - FIN 命令获取无效的消息ID

  • E_REQ_FAILED - a REQ command for an invalid message ID

E_REQ_FAILED - REQ 命令获取无效的消息ID

  • E_TOUCH_FAILED - a TOUCH command for an invalid message ID

E_TOUCH_FAILED - TOUCH 命令获取无效的消息ID

Because these errors are most often timing issues, they are not considered fatal. These situations typically occur when a message times out on the nsqd side and is re-queued and delivered to another consumer. The original recipient is no longer allowed to respond on behalf of that message.

因为这些错误通常是时间问题,所以不会被认为是致命的。这些情况通常发生在消息在nsqd端超时并重新排队并交付给另一个消费者时。原始收件人不再允许代表该消息进行响应。

Message Handling 消息处理

When the IO loop unpacks a data frame containing a message, it should route that message to the configured handler for processing.

当IO循环解析数据包包含的消息数据帧时,它应该将消息路由到配置的处理程序进行处理

The sending nsqd expects to receive a reply within its configured message timeout (default: 60 seconds). There are a few possible scenarios:

发送nsqd希望在其配置的消息超时(默认为60秒)内收到回复。有几种可能的情况:

  1. The handler indicates that the message was processed successfully.

处理程序指示消息已成功处理。

  1. The handler indicates that the message processing was unsuccessful.

处理程序指示消息处理不成功

  1. The handler decides that it needs more time to process the message.

处理程序决定需要更多的时间来处理消息。

  1. The in-flight timeout expires and nsqd automatically re-queues the message.

飞行中的超时过期,nsqd会自动重新排队。

In the first 3 cases, the client library should send the appropriate command on the consumer’s behalf (FIN, REQ, and TOUCH respectively).

在前3种情况下,客户端库应该代表消费者发送适当的命令(分别是' FIN '、' REQ '和' TOUCH ')。

The FIN command is the simplest of the bunch. It tells nsqd that it can safely discard the message. FIN can also be used to discard a message that you do not want to process or retry.

“FIN”命令是其中最简单的一个。它告诉' nsqd '它可以安全地丢弃该消息。“FIN”还可以用来丢弃不想处理或重试的消息。

The REQ command tells nsqd that the message should be re-queued (with an optional parameter specifying the amount of time to defer additional attempts). If the optional parameter is not specified by the consumer, the client library should automatically calculate the duration in relation to the number of attempts to process the message (a multiple is typically sufficient). The client library should discard messages that exceed the configured max attempts. When this occurs, a user-supplied callback should be executed to notify and enable special handling.

REQ命令告诉nsqd消息应该重新排队(使用一个可选参数指定“延迟”额外尝试的时间)。如果使用者未指定可选参数,则客户端库应根据处理消息的尝试次数自动计算持续时间(通常多次就足够了)。客户机库应该丢弃超过配置的最大尝试的消息。当发生这种情况时,应该执行用户提供的回调函数来通知并启用特殊处理。

If the message handler requires more time than the configured message timeout, the TOUCH command can be used to reset the timer on the nsqd side. This can be done repeatedly until the message is either FIN or REQ, up to the sending nsqd’s configured max_msg_timeout. Client libraries should never automatically TOUCH on behalf of the consumer.

如果消息处理程序需要的时间超过配置的消息超时时间,可以使用TOUCH命令重置nsqd端上的计时器。这可以重复执行,直到消息是 FIN REQ,直到发送' nsqd '配置的max_msg_timeout 为止。客户端库永远不应该代表消费者自动TOUCH

If the sending nsqd instance receives no response, the message will time out and be automatically re-queued for delivery to an available consumer.

如果发送的“nsqd”实例没有收到响应,消息将超时并自动重新排队,以便交付给可用的使用者。

Finally, a property of each message is the number of attempts. Client libraries should compare this value against the configured max and discard messages that have exceeded it. When a message is discarded there should be a callback fired. Typical default implementations of this callback might include writing to a directory on disk, logging, etc. The user should be able to override the default handling.

最后,每个消息的一个属性是尝试次数。客户端库应该将此值与配置的最大值进行比较,并丢弃超过该值的消息。当消息被丢弃时,应该触发回调。此回调的典型默认实现可能包括写入磁盘上的目录、日志记录等。用户应该能够覆盖默认处理。

RDY State RDY状态

Because messages are pushed from nsqd to consumers we needed a way to manage the flow of data in user-land rather than relying on low-level TCP semantics. A consumer’s RDY state is NSQ’s flow control mechanism.

因为消息是从nsqd推送到消费者的,所以我们需要一种方法来管理用户空间中的数据流,而不是依赖于底层TCP语义。消费者的RDY状态是NSQ的流控制机制。

As outlined in the configuration section, a consumer is configured with a max_in_flight. This is a concurrency and performance knob, e.g. some downstream systems are able to more-easily batch process messages and benefit greatly from a higher max-in-flight.

正如在configuration部分中概述的那样,消费者配置了一个max-in-flight。这是一个并发性和性能旋钮,例如。一些下游系统能够更容易地批处理消息,并从更高的max-in-flight中受益匪浅。

When a consumer connects to nsqd (and subscribes) it is placed in an initial RDY state of 0. No messages will be delivered.

当使用者连接到nsqd(并订阅)时,它将处于初始的RDY状态,即0。不会发送任何消息。

Client libraries have a few responsibilities:

客户端库有一些职责:

  1. bootstrap and evenly distribute the configured max_in_flight to all connections.

引导并均匀地将已配置的max_in_flight分配给所有连接。

  1. never allow the aggregate sum of RDY counts for all connections (total_rdy_count) to exceed the configured max_in_flight.

永远不要允许所有连接的RDY计数总和(total_rdy_count)超过配置的max_in_flight

  1. never exceed the per connection nsqd configured max_rdy_count.

永远不要超过每个连接nsqd配置的max_rdy_count

  1. expose an API method to reliably indicate message flow starvation

公开API方法来可靠地指示消息流不足

1. Bootstrap and Distribution 引导和分布

There are a few considerations when choosing an appropriate RDY count for a connection (in order to evenly distribute max_in_flight):

在为连接选择合适的RDY计数时,有几个注意事项(为了均匀分布max_in_flight):

  • the # of connections is dynamic, often times not even known in advance (ie. when discovering nsqd via nsqlookupd).

连接的#是动态的,通常甚至事先不知道(即。当通过' nsqlookupd '发现' nsqd '时)。

  • max_in_flight may be lower than your number of connections

' max_in_flight '可能低于您的连接数

To kickstart message flow a client library needs to send an initial RDY count. Because the eventual number of connections is often not known ahead of time it should start with a value of 1 so that the client library does not unfairly favor the first connection(s).

要启动消息流,客户机库需要发送一个初始RDY 计数。因为连接的最终数量通常不提前知道,所以它应该以值“1”开始,这样客户端库就不会不公平地偏爱第一个连接。

Additionally, after each message is processed, the client library should evaluate whether or not it’s time to update RDY state. An update should be triggered if the current value is 0 or if it is below ~25% of the last value sent.

此外,在处理每条消息之后,客户机库应该评估是否应该更新RDY状态。如果当前值为0 或小于上次发送值的25%,则应触发更新。

The client library should always attempt to evenly distribute RDY count across all connections. Typically, this is implemented as max_in_flight / num_conns.

客户端库应该始终尝试在所有连接之间平均分配RDY计数。通常,这被实现为max_in_flight / num_conns

However, when max_in_flight < num_conns this simple formula isn’t sufficient. In this state, client libraries should perform a dynamic runtime evaluation of connected nsqd “liveness” by measuring the duration of time since it last received a message over a given connection. After a configurable expiration, it should re-distribute whatever RDY count is available to a new (random) set of nsqd. By doing this, you guarantee that you’ll (eventually) find nsqd with messages. Clearly this has a latency impact.

然而,当max_in_flight < num_conns 这个简单的公式是不够的。在这种状态下,客户端库应该执行一个动态运行时评估连接的nsqd活动,方法是测量自它上次通过给定连接接收消息以来的持续时间。在一个可配置的过期之后,它应该重新分配一个新的(随机的) nsqd集合可以使用的任何RDY计数。通过这样做,您可以保证(最终)找到带有消息的nsqd。显然,这有一个延迟影响。

2. Maintaining max_in_flight 维护 max_in_flight

The client library should maintain a ceiling for the maximum number of messages in flight for a given consumer. Specifically, the aggregate sum of each connection’s RDY count should never exceed the configured max_in_flight.

客户端库应该为给定使用者的正在运行的消息的最大数量保持一个上限。具体来说,每个连接的RDY计数的总和不应该超过配置的max_in_flight

Below is example code in Python to determine whether or not the proposed RDY count is valid for a given connection:

下面是Python中的示例代码,用于确定建议的RDY计数对给定连接是否有效:

def send_ready(reader, conn, count):
    if (reader.total_ready_count + count) > reader.max_in_flight:
        return

    conn.send_ready(count)
    conn.rdy_count = count
    reader.total_ready_count += count

3. nsqd Max RDY Count nsqd 最大RDY 计数

Each nsqd is configurable with a --max-rdy-count (see feature negotiation for more information on the handshake a consumer can perform to ascertain this value). If the consumer sends a RDY count that is outside of the acceptable range its connection will be forcefully closed. For backwards compatibility, this value should be assumed to be 2500 if the nsqd instance does not support feature negotiation.

每个“nsqd”都可以配置一个“—max-rdy-count”(参见feature negotiation,以获得关于消费者可以执行的握手的更多信息,从而确定这个值)。如果使用者发送一个超出可接受范围的“RDY”计数,则其连接将被强制关闭。为了向后兼容,如果“nsqd”实例不支持feature negotiation,那么这个值应该假定为“2500”。

4. Message Flow Starvation

Finally, the client library should provide an API method to indicate message flow starvation. It is insufficient for consumers (in their message handlers) to simply compare the number of messages they have in-flight vs. their configured max_in_flight in order to decide to “process a batch”. There are two cases when this is problematic:

最后,客户端库应该提供一个API方法来表示消息流不足。对于消费者(在他们的消息处理程序中)来说,简单的比较他们在飞行中的信息数量和。为了决定“process a batch”,他们配置了`max_in_flight。有两种情况这是有问题的。

  1. When consumers configure max_in_flight > 1, due to variable num_conns, there are cases where max_in_flight is not evenly divisible by num_conns. Because the contract states that you should never exceed max_in_flight, you must round down, and you end up with cases where the sum of all RDY counts is less than max_in_flight.

当用户配置max_in_flight > 1时,由于变量num_conns ,在某些情况下max_in_flight 不能被 num_conns 均匀地整除。因为合同规定永远不要超过max_in_flight ,所以必须四舍五入,这样就会出现所有RDY 计数之和小于 max_in_flight 的情况。

  1. Consider the case where only a subset of nsqd have messages. Because of the expected even distribution of RDY count, those active nsqd only have a fraction of the configured max_in_flight.

考虑这样一种情况,即只有 nsqd 的一个子集有消息。由于预期的RDY计数even distribution,那些活动的nsqd只有配置的max_in_flight的一小部分。

In both cases, a consumer will never actually receive max_in_flight # of messages. Therefore, the client library should expose a method is_starved that will evaluate whether any of the connections are starved, as follows:

在这两种情况下,使用者实际上永远不会收到' max_in_flight ' #消息。因此,客户端库应该公开一个 is_hungry 方法,该方法将评估是否有任何连接是不足的,如下所示:

def is_starved(conns):
    for c in conns:
        # the constant 0.85 is designed to *anticipate* starvation rather than wait for it
        if c.in_flight > 0 and c.in_flight >= (c.last_ready * 0.85):
            return True
    return False

The is_starved method should be used by message handlers to reliably identify when to process a batch of messages.

is_starved 消息处理程序应该使用方法来可靠地确定何时处理一批消息。

Backoff 回退

The question of what to do when message processing fails is a complicated one to answer. The message handlingsection detailed client library behavior that would defer the processing of failed messages for some (increasing) duration of time. The other piece of the puzzle is whether or not to reduce throughput. The interplay between these two pieces of functionality is crucial for overall system stability.

当消息处理失败时该做什么是一个复杂的问题。消息处理部分详细描述了客户机库的行为,这些行为将“延迟”处理失败消息的时间(增加)。另一个难题是是否减少吞吐量。这两种功能之间的相互作用对于整个系统的稳定性至关重要。

By slowing down the rate of processing, or “backing off”, the consumer allows the downstream system to recover from transient failure. However, this behavior should be configurable as it isn’t always desirable, such as situations where latency is prioritized.

通过减慢处理速度或后退,使用者允许下游系统从暂时故障中恢复。然而,这种行为应该是可配置的,因为它并不总是可取的,例如延迟被优先化的情况。

Backoff should be implemented by sending RDY 0 to the appropriate nsqd, stopping message flow. The duration of time to remain in this state should be calculated based on the number of repeated failures (exponential). Similarly, successful processing should reduce this duration until the reader is no longer in a backoff state.

Backoff应该通过发送“RDY 0”到适当的“nsqd”来实现。停止消息流。保持这种状态的持续时间应该根据重复故障的数量(指数级)计算。同样,成功的处理应该减少这段时间,直到读者不再处于后退状态。

While a reader is in a backoff state, after the timeout expires, the client library should only ever send RDY 1regardless of max_in_flight. This effectively “tests the waters” before returning to full throttle. Additionally, during a backoff timeout, the client library should ignore any success or failure results with respect to calculating backoff duration (i.e. it should only take into account one result per backoff timeout).

当阅读器处于回退状态时,超时过期后,客户机库应该只发送“RDY 1”,而不管“max_in_flight”是什么。这有效地“试水”之前,返回全速。此外,在回退超时期间,客户端库应该忽略与计算回退时间有关的任何成功或失败结果(即,它应该只考虑每个回退超时一个*结果)。

nsq_client_flow

Encryption/Compression 加密/压缩

NSQ supports encryption and/or compression feature negotiation via the IDENTIFY command. TLS is used for encryption. Both Snappy and DEFLATE are supported for compression. Snappy is available as a third-party library, but most languages have some native support for DEFLATE.

NSQ支持通过IDENTIFY命令进行加密和/或压缩特性协商 .TLS用于加密。压缩支持Snappy和DEFLATE.Snappy是一个第三方库,但是大多数语言都支持DEFLATE。

When the IDENTIFY response is received and you’ve requested TLS via the tls_v1 flag you’ll get something similar to the following JSON:

当“IDENTIFY”响应被接收,并且您已经通过tls_v1标志请求了TLS,您将得到类似于下面的JSON:

{
    "deflate": false,
    "deflate_level": 0,
    "max_deflate_level": 6,
    "max_msg_timeout": 900000,
    "max_rdy_count": 2500,
    "msg_timeout": 60000,
    "sample_rate": 0,
    "snappy": true,
    "tls_v1": true,
    "version": "0.2.28"
}

After confirming that tls_v1 is set to true (indicating that the server supports TLS), you initiate the TLS handshake (done, for example, in Python using the ssl.wrap_socket call) before anything else is sent or received on the wire. Immediately following a successful TLS handshake you must read an encrypted NSQ OK response.

确认tls_v1被设置为true 之后(表明服务器支持TLS),您可以启动TLS握手(例如,在Python中使用ssl.wrap_socket调用)在任何其他东西通过网络发送或接收之前,在一次成功的TLS握手之后,您必须立即读取一个加密的NSQ OK 响应。

In a similar fashion, if you’ve enabled compression you’ll look for snappy or deflate being true and then wrap the socket’s read and write calls with the appropriate (de)compressor. Again, immediately read a compressed NSQ OK response.

类似于,如果您启用了压缩,您将查找snappydeflate是否为true,然后使用适当的压缩器包装套接字的读和写调用。同样,立即读取压缩的NSQ OK 响应。

These compression features are mutually-exclusive.

这些压缩特性是相互排斥的。

It’s very important that you either prevent buffering until you’ve finished negotiating encryption/compression, or make sure to take care to read-to-empty as you negotiate features

在协商加密/压缩之前,防止缓冲非常重要,或者确保在协商特性时注意从读到空

Bringing It All Together 全体总结

Distributed systems are fun.

分布式系统很有趣。

The interactions between the various components of an NSQ cluster work in concert to provide a platform on which to build robust, performant, and stable infrastructure. We hope this guide shed some light as to how important the client’s role is.

NSQ集群的各个组件之间的交互协同工作,以提供一个平台,在此平台上构建健壮、高性能和稳定的基础设施。我们希望本指南能让您了解客户的角色有多重要。

In terms of actually implementing all of this, we treat pynsq and go-nsq as our reference codebases. The structure of pynsq can be broken down into three core components:

在实际实现所有这些方面,我们将pynsq和go-nsq作为我们的参考代码库。pynsq的结构可以分为三个核心部分:

  • Message - a high-level message object, which exposes stateful methods for responding to nsqd (FIN, REQ, TOUCH, etc.) as well as metadata such as attempts and timestamp.

消息 - 高级消息对象,它公开了响应nsqd(FINREQTOUCH等)的有状态方法,以及尝试和时间戳等元数据。

  • Connection - a high-level wrapper around a TCP connection to a specific nsqd, which has knowledge of in flight messages, its RDY state, negotiated features, and various timings.

连接 - 一个高级的TCP连接包装器,连接到一个特定的“nsqd”,它了解飞行消息、RDY状态、协商特性和各种时间。

  • Consumer - the front-facing API a user interacts with, which handles discovery, creates connections (and subscribes), bootstraps and manages RDY state, parses raw incoming data, creates Message objects, and dispatches messages to handlers.

消费者 - 用户交互的前端API,处理发现、创建连接(和订阅)、引导和管理RDY状态、解析原始传入数据、创建“Message”对象,并将消息分派给处理程序。

  • Producer - the front-facing API a user interacts with, which handles publishing.

生产者—用户交互的前端API,处理发布。

We’re happy to help support anyone interested in building client libraries for NSQ. We’re looking for contributors to continue to expand our language support as well as flesh out functionality in existing libraries. The community has already open sourced many client libraries.

我们很乐意帮助任何对为NSQ构建客户端库感兴趣的人。我们正在寻找贡献者来继续扩展我们的语言支持以及充实现有库中的功能。社区已经开源许多客户端库解耦0.jpg

最后修改:2021 年 02 月 24 日 08 : 21 AM
如果觉得我的文章对你有用,请随意赞赏