NSQ is composed of 3 daemons:

NSQ 由三个守护进程组成

• nsqd is the daemon that receives, queues, and delivers messages to clients.

nsqd是一个守护进程，它接收、排队并向客户端发送消息。

• nsqlookupd is the daemon that manages topology information and provides an eventually consistent discovery service.

nsqlookupd是管理拓扑信息并提供最终一致的发现服务的守护进程。

• nsqadmin is a web UI to introspect the cluster in realtime (and perform various administrative tasks).

Data flow in NSQ is modeled as a tree of streams and consumers.

NSQ中的数据流被建模为一个由streamsconsumer 组成的树。

A topic is a distinct stream of data.

A channel is a logical grouping of consumers subscribed to a given topic.

A single nsqd can have many topics and each topic can have many channels.

A channel receives a copy of all the messages for the topic, enabling multicast style delivery while each message on a channel is distributed amongst its subscribers, enabling load-balancing.

These primitives form a powerful framework for expressing a variety of simple and complex topologies.

## Topics and Channels 主题与通道

Topics and channels, the core primitives of NSQ, best exemplify how the design of the system translates seamlessly to the features of Go.

Go’s channels (henceforth referred to as “go-chan” for disambiguation) are a natural way to express queues, thus an NSQ topic/channel, at its core, is just a buffered go-chan of Message pointers. The size of the buffer is equal to the --mem-queue-size configuration parameter.

Go的通道(从此称为Go -chan用于消除歧义)是一种自然的表示队列的方法，因此NSQ主题/通道的核心只是 Message 指针的缓冲Go -chan。缓冲区的大小等于 --mem-queue-size 配置参数。

After reading data off the wire, the act of publishing a message to a topic involves:

1. instantiation of a Message struct (and allocation of the message body []byte)

“消息”结构的实例化(以及消息体“字节切片”的分配)

1. read-lock to get the Topic

1. read-lock to check for the ability to publish

1. send on a buffered go-chan

To get messages from a topic to its channels the topic cannot rely on typical go-chan receive semantics, because multiple goroutines receiving on a go-chan would distribute the messages while the desired end result is to copyeach message to every channel (goroutine).

Instead, each topic maintains 3 primary goroutines. The first one, called router, is responsible for reading newly published messages off the incoming go-chan and storing them in a queue (memory or disk).

The second one, called messagePump, is responsible for copying and pushing messages to channels as described above.

The third is responsible for DiskQueue IO and will be discussed later.

Channels are a little more complicated but share the underlying goal of exposing a single input and single output go-chan (to abstract away the fact that, internally, messages might be in memory or on disk):

Additionally, each channel maintains 2 time-ordered priority queues responsible for deferred and in-flight message timeouts (and 2 accompanying goroutines for monitoring them).

Parallelization is improved by managing a per-channel data structure, rather than relying on the Go runtime’s global timer scheduler.

Note: Internally, the Go runtime uses a single priority queue and goroutine to manage timers. This supports (but is not limited to) the entirety of the time package.

It normally obviates the need for a user-land time-ordered priority queue but it’s important to keep in mind that it’s a single data structure with a single lock, potentially impacting GOMAXPROCS > 1 performance. See runtime/time.go.

## Backend / DiskQueue 后端/磁盘队列

One of NSQ’s design goals is to bound the number of messages kept in memory.

NSQ的设计目标之一是限制内存中的消息数量。

It does this by transparently writing message overflow to disk via DiskQueue (which owns the third primary goroutine for a topic or channel).

Since the memory queue is just a go-chan, it’s trivial to route messages to memory first, if possible, then fallback to disk:

for msg := range c.incomingMsgChan {
select {
case c.memoryMsgChan <- msg:
default:
err := WriteMessageToBackend(&msgBuf, msg, c.backend)
if err != nil {
// ... handle errors ...
}
}
}

Taking advantage of Go’s select statement allows this functionality to be expressed in just a few lines of code: the default case above only executes if memoryMsgChan is full.

NSQ also has the concept of ephemeral topics/channels.

NSQ还具有临时主题/通道的概念。

They discard message overflow (rather than write to disk) and disappear when they no longer have clients subscribed. This is a perfect use case for Go’s interfaces.

Topics and channels have a struct member declared as a Backend interface rather than a concrete type.

Normal topics and channels use a DiskQueue while ephemeral ones stub in a DummyBackendQueue, which implements a no-op Backend.

## Reducing GC Pressure 减少垃圾回收压力

In any garbage collected environment you’re subject to the tension between throughput (doing useful work), latency (responsiveness), and resident set size (footprint).

As of Go 1.2, the GC is mark-and-sweep (parallel), non-generational, non-compacting, stop-the-world and mostly precise . It’s mostly precise because the remainder of the work wasn’t completed in time (it’s slated for Go 1.3).

The Go GC will certainly continue to improve, but the universal truth is: the less garbage you create the less time you’ll collect.

Go GC肯定会继续改进，但普遍的事实是:创建的垃圾越少，收集的时间就越少。

First, it’s important to understand how the GC is behaving under real workloads.

To this end, nsqd publishes GC stats in statsd format (alongside other internal metrics).

nsqadmin displays graphs of these metrics, giving you insight into the GC’s impact in both frequency and duration:

In order to actually reduce garbage you need to know where it’s being generated.

Once again the Go toolchain provides the answers:

Go工具链再次提供了答案:

1. Use the testing package and go test -benchmem to benchmark hot code paths. It profiles the number of allocations per iteration (and benchmark runs can be compared with benchcmp).

1. Build using go build -gcflags -m, which outputs the result of escape analysis.

With that in mind, the following optimizations proved useful for nsqd:

1. Avoid []byte to string conversions.

1. Re-use buffers or objects (and someday possibly sync.Pool aka issue 4720).

1. Pre-allocate slices (specify capacity in make) and always know the number and size of items over the wire.

1. Apply sane limits to various configurable dials (such as message size).

1. Avoid boxing (use of interface{}) or unnecessary wrapper types (like a struct for a “multiple value” go-chan).

1. Avoid the use of defer in hot code paths (it allocates).

### TCP Protocol TCP 协议

The NSQ TCP protocol is a shining example of a section where these GC optimization concepts are utilized to great effect.

NSQ TCP协议是一个很好的例子，在这个例子中，这些GC优化概念得到了很好的利用。

The protocol is structured with length prefixed frames, making it straightforward and performant to encode and decode:

[x][x][x][x][x][x][x][x][x][x][x][x]...
|  (int32) ||  (int32) || (binary)
|  4-byte  ||  4-byte  || N-byte
------------------------------------...
size      frame ID     data

Since the exact type and size of a frame’s components are known ahead of time, we can avoid theencoding/binary package’s convenience Read() and Write() wrappers (and their extraneous interface lookups and conversions) and instead call the appropriate binary.BigEndian methods directly.

To reduce socket IO syscalls, client net.Conn are wrapped with bufio.Reader and bufio.Writer.

The Readerexposes ReadSlice(), which reuses its internal buffer.

This nearly eliminates allocations while reading off the socket, greatly reducing GC pressure.

This is possible because the data associated with most commands does not escape (in the edge cases where this is not true, the data is explicitly copied).

At an even lower level, a MessageID is declared as [16]byte to be able to use it as a map key (slices cannot be used as map keys).

However, since data read from the socket is stored as []byte, rather than produce garbage by allocating string keys, and to avoid a copy from the slice to the backing array of the MessageID, the unsafepackage is used to cast the slice directly to a MessageID:

id := *(*nsq.MessageID)(unsafe.Pointer(&msgID))

Note: This is a hack. It wouldn’t be necessary if this was optimized by the compiler and Issue 3512 is open to potentially resolve this.

It’s also worth reading through issue 5376, which talks about the possibility of a “const like” byte type that could be used interchangeably where string is accepted, without allocating and copying.

Similarly, the Go standard library only provides numeric conversion methods on a string. In order to avoid string allocations, nsqd uses a custom base 10 conversion method that operates directly on a []byte.

These may seem like micro-optimizations but the TCP protocol contains some of the hottest code paths.

In aggregate, at the rate of tens of thousands of messages per second, they have a significant impact on the number of allocations and overhead:

benchmark                    old ns/op    new ns/op    delta
BenchmarkProtocolV2Data           3575         1963  -45.09%

benchmark                    old ns/op    new ns/op    delta
BenchmarkProtocolV2Sub256        57964        14568  -74.87%
BenchmarkProtocolV2Sub512        58212        16193  -72.18%
BenchmarkProtocolV2Sub1k         58549        19490  -66.71%
BenchmarkProtocolV2Sub2k         63430        27840  -56.11%

benchmark                   old allocs   new allocs    delta
BenchmarkProtocolV2Sub256           56           39  -30.36%
BenchmarkProtocolV2Sub512           56           39  -30.36%
BenchmarkProtocolV2Sub1k            56           39  -30.36%
BenchmarkProtocolV2Sub2k            58           42  -27.59%

## HTTP 超文本传输协议

NSQ’s HTTP API is built on top of Go’s net/http package.

NSQ HTTP API 是构建在 Go net/http包中的。

Because it’s just HTTP, it can be leveraged in almost any modern programming environment without special client libraries.

Its simplicity belies its power, as one of the most interesting aspects of Go’s HTTP tool-chest is the wide range of debugging capabilities it supports.

The net/http/pprof package integrates directly with the native HTTP server, exposing endpoints to retrieve CPU, heap, goroutine, and OS thread profiles. These can be targeted directly from the go tool:

$go tool pprof http://127.0.0.1:4151/debug/pprof/profile This is a tremendously valuable for debugging and profiling a running process! 这对于调试和分析一个正在运行的进程非常有价值! In addition, a /stats endpoint returns a slew of metrics in either JSON or pretty-printed text, making it easy for an administrator to introspect from the command line in realtime: 除此之外，一个 /stats请求返回JSON或打印精美的文本中的大量指标，方便管理员从命令行实时反馈: $ watch -n 0.5 'curl -s http://127.0.0.1:4151/stats | grep -v connected'

This produces continuous output like:

[page_views     ] depth: 0     be-depth: 0     msgs: 105525994 e2e%: 6.6s, 6.2s, 6.2s
[page_view_counter        ] depth: 0     be-depth: 0     inflt: 432  def: 0    re-q: 34684 timeout: 34038 msgs: 105525994 e2e%: 5.1s, 5.1s, 4.6s
[realtime_score           ] depth: 1828  be-depth: 0     inflt: 1368 def: 0    re-q: 25188 timeout: 11336 msgs: 105525994 e2e%: 9.0s, 9.0s, 7.8s
[variants_writer          ] depth: 0     be-depth: 0     inflt: 592  def: 0    re-q: 37068 timeout: 37068 msgs: 105525994 e2e%: 8.2s, 8.2s, 8.2s

[poll_requests  ] depth: 0     be-depth: 0     msgs: 11485060 e2e%: 167.5ms, 167.5ms, 138.1ms
[social_data_collector    ] depth: 0     be-depth: 0     inflt: 2    def: 3    re-q: 7568  timeout: 402   msgs: 11485060 e2e%: 186.6ms, 186.6ms, 138.1ms

[social_data    ] depth: 0     be-depth: 0     msgs: 60145188 e2e%: 199.0s, 199.0s, 199.0s
[events_writer            ] depth: 0     be-depth: 0     inflt: 226  def: 0    re-q: 32584 timeout: 30542 msgs: 60145188 e2e%: 6.7s, 6.7s, 6.7s
[social_delta_counter     ] depth: 17328 be-depth: 7327  inflt: 179  def: 1    re-q: 155843 timeout: 11514 msgs: 60145188 e2e%: 234.1s, 234.1s, 231.8s

[time_on_site_ticks] depth: 0     be-depth: 0     msgs: 35717814 e2e%: 0.0ns, 0.0ns, 0.0ns
[tail821042#ephemeral     ] depth: 0     be-depth: 0     inflt: 0    def: 0    re-q: 0     timeout: 0     msgs: 33909699 e2e%: 0.0ns, 0.0ns, 0.0ns

Finally, each new Go release typically brings measurable performance gains.

It’s always nice when recompiling against the latest version of Go provides a free boost!

## Dependencies 依赖性

Coming from other ecosystems, Go’s philosophy (or lack thereof) on managing dependencies takes a little time to get used to.

NSQ evolved from being a single giant repo, with relative imports and little to no separation between internal packages, to fully embracing the recommended best practices with respect to structure and dependency management.

There are two main schools of thought:

1. Vendoring: copy dependencies at the correct revision into your application’s repo and modify your import paths to reference the local copy.

Vendoring:将正确修订时的依赖项复制到应用程序的代码库中并修改导入路径以引用本地副本。

1. Virtual Env: list the revisions of dependencies you require and at build time, produce a pristine GOPATHenvironment containing those pinned dependencies.

Virtual Env:列出在构建时所需的依赖项的修订,生成一个原始的GOPATH环境，其中包含那些固定的依赖项。

Note: This really only applies to binary packages as it doesn’t make sense for an importable package to make intermediate decisions as to which version of a dependency to use.

NSQ uses gpm to provide support for (2) above.

NSQ使用gpm为上述第(2)项提供支持。

It works by recording your dependencies in a Godeps file, which we later use to construct a GOPATH environment.

## Testing 测试

Go provides solid built-in support for writing tests and benchmarks and, because Go makes it so easy to model concurrent operations, it’s trivial to stand up a full-fledged instance of nsqd inside your test environment.

Go为编写测试和基准测试提供了可靠的内置支持,因为Go使建模并发操作变得非常容易,在测试环境中建立一个完整的nsqd实例非常简单。

However, there was one aspect of the initial implementation that became problematic for testing: global state. The most obvious offender was the use of a global variable that held the reference to the instance of nsqd at runtime, i.e. var nsqd *NSQd.

Certain tests would inadvertently mask this global variable in their local scope by using short-form variable assignment, i.e. nsqd := NewNSQd(...).

This meant that the global reference did not point to the instance that was currently running, breaking tests.

To resolve this, a Context struct is passed around that contains configuration metadata and a reference to the parent nsqd.

All references to global state were replaced with this local Context, allowing children (topics, channels, protocol handlers, etc.) to safely access this data and making it more reliable to test.

## Robustness 鲁棒性

A system that isn’t robust in the face of changing network conditions or unexpected events is a system that will not perform well in a distributed production environment.

NSQ is designed and implemented in a way that allows the system to tolerate failure and behave in a consistent, predictable, and unsurprising way.

NSQ的设计和实现允许系统容忍失败，并以一致的、可预测的和不足为奇的方式运行。

The overarching philosophy is to fail fast, treat errors as fatal, and provide a means to debug any issues that do occur.

But, in order to react you need to be able to detect exceptional conditions…

### Heartbeats and Timeouts 心跳与超时

The NSQ TCP protocol is push oriented. After connection, handshake, and subscription the consumer is placed in a RDY state of 0.

NSQ TCP协议是面向推的。连接、握手和订阅之后，使用者将处于“RDY”状态，即“0”。

When the consumer is ready to receive messages it updates that RDY state to the number of messages it is willing to accept.

NSQ client libraries continually manage this behind the scenes, resulting in a flow-controlled stream of messages.

NSQ客户端库在幕后不断地管理这一点，从而产生一个流控制的消息流。

Periodically, nsqd will send a heartbeat over the connection.

nsqd将定期通过连接发送一个心跳。

The client can configure the interval between heartbeats but nsqd expects a response before it sends the next one.

The combination of application level heartbeats and RDY state avoids head-of-line blocking, which can otherwise render heartbeats useless (i.e. if a consumer is behind in processing message flow the OS’s receive buffer will fill up, blocking heartbeats).

To guarantee progress, all network IO is bound with deadlines relative to the configured heartbeat interval. This means that you can literally unplug the network connection between nsqd and a consumer and it will detect and properly handle the error.

When a fatal error is detected the client connection is forcibly closed. In-flight messages are timed out and re-queued for delivery to another consumer. Finally, the error is logged and various internal metrics are incremented.

### Managing Goroutines 管理Go程

It’s surprisingly easy to start goroutines. Unfortunately, it isn’t quite as easy to orchestrate their cleanup. Avoiding deadlocks is also challenging.

Most often this boils down to an ordering problem, where a goroutine receiving on a go-chan exits before the upstream goroutines sending on it.

Why care at all though? It’s simple, an orphaned goroutine is a memory leak.

Memory leaks in long running daemons are bad, especially when the expectation is that your process will be stable when all else fails.

To further complicate things, a typical nsqd process has many goroutines involved in message delivery. Internally, message “ownership” changes often.

To be able to shutdown cleanly, it’s incredibly important to account for all intraprocess messages.

Although there aren’t any magic bullets, the following techniques make it a little easier to manage…

#### WaitGroups Go程同步

The sync package provides sync.WaitGroup, which can be used to perform accounting of how many goroutines are live (and provide a means to wait on their exit).

sync 包提供了 sync.WaitGroup ，哪个可以用来计算有多少goroutine是有用的（并且提供一种等待他们退出的方法）。

To reduce the typical boilerplate, nsqd uses this wrapper:

type WaitGroupWrapper struct {
sync.WaitGroup
}

func (w *WaitGroupWrapper) Wrap(cb func()) {
go func() {
cb()
w.Done()
}()
}

// can be used as follows:
wg := WaitGroupWrapper{}
wg.Wrap(func() { n.idPump() })
...
wg.Wait()

#### Exit Signaling 退出信号

The easiest way to trigger an event in multiple child goroutines is to provide a single go-chan that you close when ready.

All pending receives on that go-chan will activate, rather than having to send a separate signal to each goroutine.

go-chan上的所有待定接收将被激活，而不是必须向每个goroutine发送单独的信号。

func work() {
exitChan := make(chan int)
time.Sleep(5 * time.Second)
close(exitChan)
}
<-exitChan
}

<-exitChan
}

#### Synchronizing Exit 同步推出

It was quite difficult to implement a reliable, deadlock free, exit path that accounted for all in-flight messages. A few tips:

1. Ideally the goroutine responsible for sending on a go-chan should also be responsible for closing it.

1. If messages cannot be lost, ensure that pertinent go-chans are emptied (especially unbuffered ones!) to guarantee senders can make progress.

1. Alternatively, if a message is no longer relevant, sends on a single go-chan should be converted to a selectwith the addition of an exit signal (as discussed above) to guarantee progress.

1. The general order should be:

1. Stop accepting new connections (close listeners)

停止接受新连接(关闭侦听器)

2. Signal exit to child goroutines (see above)

向子goroutines发送退出信号(见上面)

3. Wait on WaitGroup for goroutine exit (see above)

等待goroutine退出WaitGroup(见上图)

4. Recover buffered data

恢复缓冲数据

5. Flush anything left to disk

将所有剩余的内容刷新到磁盘

#### Logging 日志

Finally, the most important tool at your disposal is to log the entrance and exit of your goroutines!.

It makes it infinitely easier to identify the culprit in the case of deadlocks or leaks.

nsqd log lines include information to correlate goroutines with their siblings (and parent), such as the client’s remote address or the topic/channel name.

nsqd日志行包含将goroutine与其兄弟(和父)关联起来的信息，例如客户机的远程地址或主题/通道名称。

The logs are verbose, but not verbose to the point where the log is overwhelming.

There’s a fine line, but nsqdleans towards the side of having more information in the logs when a fault occurs rather than trying to reduce chattiness at the expense of usefulness.