How Tendermint p2p Nodes Receive Messages
Yi Zhang
Posted on January 13, 2024
In file "tendermint/p2p/peer.go", we have following code
// peer implements Peer.
//
// Before using a peer, you will need to perform a handshake on connection.
type peer struct {
service.BaseService
// raw peerConn and the multiplex connection
peerConn
mconn *tmconn.MConnection
// peer's node info and the channel it knows about
// channels = nodeInfo.Channels
// cached to avoid copying nodeInfo in hasChannel
nodeInfo NodeInfo
channels []byte
// User data
Data *cmap.CMap
metrics *Metrics
metricsTicker *time.Ticker
}
type PeerOption func(*peer)
func newPeer(
pc peerConn,
mConfig tmconn.MConnConfig,
nodeInfo NodeInfo,
reactorsByCh map[byte]Reactor,
chDescs []*tmconn.ChannelDescriptor,
onPeerError func(Peer, interface{}),
options ...PeerOption,
) *peer {
p := &peer{
peerConn: pc,
nodeInfo: nodeInfo,
channels: nodeInfo.(DefaultNodeInfo).Channels,
Data: cmap.NewCMap(),
metricsTicker: time.NewTicker(metricsTickerDuration),
metrics: NopMetrics(),
}
p.mconn = createMConnection(
pc.conn,
p,
reactorsByCh,
chDescs,
onPeerError,
mConfig,
)
p.BaseService = *service.NewBaseService(nil, "Peer", p)
for _, option := range options {
option(p)
}
return p
}
........
func createMConnection(
conn net.Conn,
p *peer,
reactorsByCh map[byte]Reactor,
chDescs []*tmconn.ChannelDescriptor,
onPeerError func(Peer, interface{}),
config tmconn.MConnConfig,
) *tmconn.MConnection {
onReceive := func(chID byte, msgBytes []byte) {
reactor := reactorsByCh[chID]
if reactor == nil {
// Note that its ok to panic here as it's caught in the conn._recover,
// which does onPeerError.
panic(fmt.Sprintf("Unknown channel %X", chID))
}
labels := []string{
"peer_id", string(p.ID()),
"chID", fmt.Sprintf("%#x", chID),
}
p.metrics.PeerReceiveBytesTotal.With(labels...).Add(float64(len(msgBytes)))
reactor.Receive(chID, p, msgBytes)
}
onError := func(r interface{}) {
onPeerError(p, r)
}
return tmconn.NewMConnectionWithConfig(
conn,
chDescs,
onReceive,
onError,
config,
)
}
In file "tendermint/p2p/conn/connection.go", we have code:
type MConnection struct {
service.BaseService
conn net.Conn
bufConnReader *bufio.Reader
bufConnWriter *bufio.Writer
sendMonitor *flow.Monitor
recvMonitor *flow.Monitor
send chan struct{}
pong chan struct{}
channels []*Channel
channelsIdx map[byte]*Channel
onReceive receiveCbFunc
onError errorCbFunc
errored uint32
config MConnConfig
........
}
// recvRoutine reads PacketMsgs and reconstructs the message using the channels' "recving" buffer.
// After a whole message has been assembled, it's pushed to onReceive().
// Blocks depending on how the connection is throttled.
// Otherwise, it never blocks.
func (c *MConnection) recvRoutine() {
defer c._recover()
protoReader := protoio.NewDelimitedReader(c.bufConnReader, c._maxPacketMsgSize)
FOR_LOOP:
for {
........
// Read packet type
var packet tmp2p.Packet
_n, err := protoReader.ReadMsg(&packet)
........
// Read more depending on packet type.
switch pkt := packet.Sum.(type) {
........
case *tmp2p.Packet_PacketMsg:
channelID := byte(pkt.PacketMsg.ChannelID)
channel, ok := c.channelsIdx[channelID]
if pkt.PacketMsg.ChannelID < 0 || pkt.PacketMsg.ChannelID > math.MaxUint8 || !ok || channel == nil {
err := fmt.Errorf("unknown channel %X", pkt.PacketMsg.ChannelID)
c.Logger.Debug("Connection failed @ recvRoutine", "conn", c, "err", err)
c.stopForError(err)
break FOR_LOOP
}
msgBytes, err := channel.recvPacketMsg(*pkt.PacketMsg)
if err != nil {
if c.IsRunning() {
c.Logger.Debug("Connection failed @ recvRoutine", "conn", c, "err", err)
c.stopForError(err)
}
break FOR_LOOP
}
if msgBytes != nil {
c.Logger.Debug("Received bytes", "chID", channelID, "msgBytes", msgBytes)
// NOTE: This means the reactor.Receive runs in the same thread as the p2p recv routine
c.onReceive(channelID, msgBytes)
}
default:
........
}
Conclusion:
The MConnection relentlessly receives packets from network. When it receives a packet of type Packet_PacketMsg, it go ahead to receive the whole message with this function:
msgBytes, err := channel.recvPacketMsg(*pkt.PacketMsg)
Then it calls the
c.onReceive(channelID, msgBytes)
The onReceive is a callback defined in struc MConnection. It gets initialzied in "tendermint/p2p/peer.go". The callback first finds out the reactor which the channelID corresponds to. And then call the reactor's "Receive" function with the channelID, peer, and msgBytes as parameters:
reactor.Receive(chID, p, msgBytes)
This is how the reactor's Receive function gets message from peers.
Posted on January 13, 2024
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.
Related
November 29, 2024