ZhongAn Insurance's Wang Kai Analyzes Kafka Network Communication

automq

AutoMQ

Posted on June 7, 2024

ZhongAn Insurance's Wang Kai Analyzes Kafka Network Communication

Author: Kai Wang, Java Development Expert at ZhongAn Online Insurance Basic Platform

Introduction

Today, we explore the core workflow of network communication in Kafka, specifically focusing on Apache Kafka 3.7[2]. This discussion also includes insights into the increasingly popular AutoMQ, highlighting its network communication optimizations and enhancements derived from Kafka.

I. How to Construct a Basic Request and Handle Responses

As a message queue, network communication essentially involves two key aspects:

  • Communication between message producers and the message queue server (in Kafka, this involves producers "pushing" messages to the queue)
  • Communication between message consumers and the message queue server (in Kafka, this involves consumers "pulling" messages from the queue)

Image description

This diagram primarily illustrates the process from message dispatch to response reception.
Client:

  1. KafkaProducer initializes the Sender thread
  2. The Sender thread retrieves batched data from the RecordAccumulator (for detailed client-side sending, see [https://mp.weixin.qq.com/s/J2_O1l81duknfdFvHuBWxw])
  3. The Sender thread employs the NetworkClient to check the connection status and initiates a connection if necessary
  4. The Sender thread invokes the NetworkClient's doSend method to transmit data to the KafkaChannel
  5. The Sender thread utilizes the NetworkLink's poll method for actual data transmission Server:
  6. KafkaServer initializes SocketServer, dataPlaneRequestProcessor (KafkaApis), and dataPlaneRequestHandlerPool
  7. SocketServer sets up the RequestChannel and dataPlaneAcceptor
  8. The dataPlaneAcceptor takes charge of acquiring connections and delegating tasks to the appropriate Processor
  9. The Processor thread pulls tasks from the newConnections queue for processing Processor threads handle prepared IO events
  10. configureNewConnections(): Establish new connections
  11. processNewResponses(): Dispatch Response and enqueue it in the inflightResponses temporary queue
  12. poll(): Execute NIO polling to retrieve ready I/O operations on the respective SocketChannel
  13. processCompletedReceives(): Enqueue received Requests in the RequestChannel queue
  14. processCompletedSends(): Implement callback logic for Responses in the temporary Response queue
  15. processDisconnected(): Handle connections that have been disconnected due to send failures
  16. closeExcessConnections(): Terminate connections that surpass quota limits
  17. The KafkaRequestHandler retrieves the ready events from the RequestChannel and assigns them to the appropriate KafkaApi for processing.
  18. After processing by the KafkaApi, the response is returned to the RequestChannel.
  19. The Processor thread then delivers the response to the client. This completes a full cycle of message transmission in Kafka, encompassing both client and server processing steps. ##Ⅱ.Kafka Network Communication 1. Server-side Communication Thread Model Unlike RocketMQ, which relies on Netty for efficient network communication, Kafka uses Java NIO to implement a master-slave Reactor pattern for network communication (for further information, see [https://jenkov.com/tutorials/java-nio/overview.html]).

Image description

Both DataPlanAcceptor and ControlPlanAcceptor are subclasses of Acceptor, a thread class that executes the Runnable interface. The primary function of an Acceptor is to listen for and receive requests between Clients and Brokers, as well as to set up transmission channels (SocketChannel). It employs a polling mechanism to delegate these to a Processor for processing. Additionally, a RequestChannel (ArrayBlockingQueue) is utilized to facilitate connections between Processors and Handlers. The MainReactor (Acceptor) solely manages the OP_ACCEPT event; once detected, it forwards the SocketChannel to the SubReactor (Processor). Each Processor operates with its own Selector, and the SubReactor listens to and processes other events, ultimately directing the actual requests to the KafkaRequestHandlerPool.

2. Initialization of the main components in the thread model

Image description

The diagram illustrates that during the broker startup, the KafkaServer's startup method is invoked (assuming it operates in zookeeper mode)
The startup method primarily establishes:

  1. KafkaApis handlers: creating dataPlaneRequestProcessor and controlPlaneRequestByProcessor
  2. KafkaRequestHandlerPool: forming dataPlaneRequestHandlerPool and controlPlaneRequestHandlerPool
  3. Initialization of socketServer
  4. Establishment of controlPlaneAcceptorAndProcessor and dataPlaneAcceptorAndProcessor Additionally, an important step not depicted in the diagram but included in the startup method is the thread startup: enableRequestProcessing is executed via the initialized socketServer.

3.Addition and Removal of Processor
1.Addition

  • Processor is added when the broker starts
  • Actively adjust the number of num.network.threads processing threads 2.Startup
  • Processor starts when the broker launches the acceptor
  • Actively start the new processing threads that were not started during the adjustment 3.Remove from the queue and destroy
  • broker shutdown
  • Actively adjusting the num.network.threads to eliminate excess threads and close them

Image description

4. KafkaRequestHandlePool and KafkaRequestHandler
1.KafkaRequestHandlerPool
The primary location for processing Kafka requests, this is a request handling thread pool tasked with creating, maintaining, managing, and dismantling its associated request handling threads.
2.KafkaRequestHandler
The actual class for business request handling threads, where each request handling thread instance is tasked with retrieving request objects from the SocketServer's RequestChannel queue and processing them.
Below is the method body processed by KafkaRequestHandler:

def run(): Unit = {
  threadRequestChannel.set(requestChannel)
  while (!stopped) {
    // We use a single meter for aggregate idle percentage for the thread pool.
    // Since meter is calculated as total_recorded_value / time_window and
    // time_window is independent of the number of threads, each recorded idle
    // time should be discounted by # threads.
    val startSelectTime = time.nanoseconds
    // 从请求队列中获取下一个待处理的请求
    val req = requestChannel.receiveRequest(300)
    val endTime = time.nanoseconds
    val idleTime = endTime - startSelectTime
    aggregateIdleMeter.mark(idleTime / totalHandlerThreads.get)

    req match {
      case RequestChannel.ShutdownRequest =>
        debug(s"Kafka request handler $id on broker $brokerId received shut down command")
        completeShutdown()
        return

      case callback: RequestChannel.CallbackRequest =>
        val originalRequest = callback.originalRequest
        try {

          // If we've already executed a callback for this request, reset the times and subtract the callback time from the 
          // new dequeue time. This will allow calculation of multiple callback times.
          // Otherwise, set dequeue time to now.
          if (originalRequest.callbackRequestDequeueTimeNanos.isDefined) {
            val prevCallbacksTimeNanos = originalRequest.callbackRequestCompleteTimeNanos.getOrElse(0L) - originalRequest.callbackRequestDequeueTimeNanos.getOrElse(0L)
            originalRequest.callbackRequestCompleteTimeNanos = None
            originalRequest.callbackRequestDequeueTimeNanos = Some(time.nanoseconds() - prevCallbacksTimeNanos)
          } else {
            originalRequest.callbackRequestDequeueTimeNanos = Some(time.nanoseconds())
          }

          threadCurrentRequest.set(originalRequest)
          callback.fun(requestLocal)
        } catch {
          case e: FatalExitError =>
            completeShutdown()
            Exit.exit(e.statusCode)
          case e: Throwable => error("Exception when handling request", e)
        } finally {
          // When handling requests, we try to complete actions after, so we should try to do so here as well.
          apis.tryCompleteActions()
          if (originalRequest.callbackRequestCompleteTimeNanos.isEmpty)
            originalRequest.callbackRequestCompleteTimeNanos = Some(time.nanoseconds())
          threadCurrentRequest.remove()
        }
     // 普通情况由KafkaApis.handle方法执行相应处理逻辑
      case request: RequestChannel.Request =>
        try {
          request.requestDequeueTimeNanos = endTime
          trace(s"Kafka request handler $id on broker $brokerId handling request $request")
          threadCurrentRequest.set(request)
          apis.handle(request, requestLocal)
        } catch {
          case e: FatalExitError =>
            completeShutdown()
            Exit.exit(e.statusCode)
          case e: Throwable => error("Exception when handling request", e)
        } finally {
          threadCurrentRequest.remove()
          request.releaseBuffer()
        }

      case RequestChannel.WakeupRequest => 
        // We should handle this in receiveRequest by polling callbackQueue.
        warn("Received a wakeup request outside of typical usage.")

      case null => // continue
    }
  }
  completeShutdown()
}

Enter fullscreen mode Exit fullscreen mode

Here, line 56 will reassign the task to KafkaApis's handle for processing.

Ⅲ.unified request handling dispatch

The primary business processing class in Kafka is actually KafkaApis, which serves as the core of all communication and thread handling efforts.

override def handle(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
  def handleError(e: Throwable): Unit = {
    error(s"Unexpected error handling request ${request.requestDesc(true)} " +
      s"with context ${request.context}", e)
    requestHelper.handleError(request, e)
  }

  try {
    trace(s"Handling request:${request.requestDesc(true)} from connection ${request.context.connectionId};" +
      s"securityProtocol:${request.context.securityProtocol},principal:${request.context.principal}")

    if (!apiVersionManager.isApiEnabled(request.header.apiKey, request.header.apiVersion)) {
      // The socket server will reject APIs which are not exposed in this scope and close the connection
      // before handing them to the request handler, so this path should not be exercised in practice
      throw new IllegalStateException(s"API ${request.header.apiKey} with version ${request.header.apiVersion} is not enabled")
    }

    request.header.apiKey match {
      case ApiKeys.PRODUCE => handleProduceRequest(request, requestLocal)
      case ApiKeys.FETCH => handleFetchRequest(request)
      case ApiKeys.LIST_OFFSETS => handleListOffsetRequest(request)
      case ApiKeys.METADATA => handleTopicMetadataRequest(request)
      case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
      case ApiKeys.STOP_REPLICA => handleStopReplicaRequest(request)
      case ApiKeys.UPDATE_METADATA => handleUpdateMetadataRequest(request, requestLocal)
      case ApiKeys.CONTROLLED_SHUTDOWN => handleControlledShutdownRequest(request)
      case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request, requestLocal).exceptionally(handleError)
      case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request).exceptionally(handleError)
      case ApiKeys.FIND_COORDINATOR => handleFindCoordinatorRequest(request)
      case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request, requestLocal).exceptionally(handleError)
      case ApiKeys.HEARTBEAT => handleHeartbeatRequest(request).exceptionally(handleError)
      case ApiKeys.LEAVE_GROUP => handleLeaveGroupRequest(request).exceptionally(handleError)
      case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request, requestLocal).exceptionally(handleError)
      case ApiKeys.DESCRIBE_GROUPS => handleDescribeGroupsRequest(request).exceptionally(handleError)
      case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request).exceptionally(handleError)
      case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
      case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request)
      case ApiKeys.CREATE_TOPICS => maybeForwardToController(request, handleCreateTopicsRequest)
      case ApiKeys.DELETE_TOPICS => maybeForwardToController(request, handleDeleteTopicsRequest)
      case ApiKeys.DELETE_RECORDS => handleDeleteRecordsRequest(request)
      case ApiKeys.INIT_PRODUCER_ID => handleInitProducerIdRequest(request, requestLocal)
      case ApiKeys.OFFSET_FOR_LEADER_EPOCH => handleOffsetForLeaderEpochRequest(request)
      case ApiKeys.ADD_PARTITIONS_TO_TXN => handleAddPartitionsToTxnRequest(request, requestLocal)
      case ApiKeys.ADD_OFFSETS_TO_TXN => handleAddOffsetsToTxnRequest(request, requestLocal)
      case ApiKeys.END_TXN => handleEndTxnRequest(request, requestLocal)
      case ApiKeys.WRITE_TXN_MARKERS => handleWriteTxnMarkersRequest(request, requestLocal)
      case ApiKeys.TXN_OFFSET_COMMIT => handleTxnOffsetCommitRequest(request, requestLocal).exceptionally(handleError)
      case ApiKeys.DESCRIBE_ACLS => handleDescribeAcls(request)
      case ApiKeys.CREATE_ACLS => maybeForwardToController(request, handleCreateAcls)
      case ApiKeys.DELETE_ACLS => maybeForwardToController(request, handleDeleteAcls)
      case ApiKeys.ALTER_CONFIGS => handleAlterConfigsRequest(request)
      case ApiKeys.DESCRIBE_CONFIGS => handleDescribeConfigsRequest(request)
      case ApiKeys.ALTER_REPLICA_LOG_DIRS => handleAlterReplicaLogDirsRequest(request)
      case ApiKeys.DESCRIBE_LOG_DIRS => handleDescribeLogDirsRequest(request)
      case ApiKeys.SASL_AUTHENTICATE => handleSaslAuthenticateRequest(request)
      case ApiKeys.CREATE_PARTITIONS => maybeForwardToController(request, handleCreatePartitionsRequest)
      // Create, renew and expire DelegationTokens must first validate that the connection
      // itself is not authenticated with a delegation token before maybeForwardToController.
      case ApiKeys.CREATE_DELEGATION_TOKEN => handleCreateTokenRequest(request)
      case ApiKeys.RENEW_DELEGATION_TOKEN => handleRenewTokenRequest(request)
      case ApiKeys.EXPIRE_DELEGATION_TOKEN => handleExpireTokenRequest(request)
      case ApiKeys.DESCRIBE_DELEGATION_TOKEN => handleDescribeTokensRequest(request)
      case ApiKeys.DELETE_GROUPS => handleDeleteGroupsRequest(request, requestLocal).exceptionally(handleError)
      case ApiKeys.ELECT_LEADERS => maybeForwardToController(request, handleElectLeaders)
      case ApiKeys.INCREMENTAL_ALTER_CONFIGS => handleIncrementalAlterConfigsRequest(request)
      case ApiKeys.ALTER_PARTITION_REASSIGNMENTS => maybeForwardToController(request, handleAlterPartitionReassignmentsRequest)
      case ApiKeys.LIST_PARTITION_REASSIGNMENTS => maybeForwardToController(request, handleListPartitionReassignmentsRequest)
      case ApiKeys.OFFSET_DELETE => handleOffsetDeleteRequest(request, requestLocal).exceptionally(handleError)
      case ApiKeys.DESCRIBE_CLIENT_QUOTAS => handleDescribeClientQuotasRequest(request)
      case ApiKeys.ALTER_CLIENT_QUOTAS => maybeForwardToController(request, handleAlterClientQuotasRequest)
      case ApiKeys.DESCRIBE_USER_SCRAM_CREDENTIALS => handleDescribeUserScramCredentialsRequest(request)
      case ApiKeys.ALTER_USER_SCRAM_CREDENTIALS => maybeForwardToController(request, handleAlterUserScramCredentialsRequest)
      case ApiKeys.ALTER_PARTITION => handleAlterPartitionRequest(request)
      case ApiKeys.UPDATE_FEATURES => maybeForwardToController(request, handleUpdateFeatures)
      case ApiKeys.ENVELOPE => handleEnvelope(request, requestLocal)
      case ApiKeys.DESCRIBE_CLUSTER => handleDescribeCluster(request)
      case ApiKeys.DESCRIBE_PRODUCERS => handleDescribeProducersRequest(request)
      case ApiKeys.UNREGISTER_BROKER => forwardToControllerOrFail(request)
      case ApiKeys.DESCRIBE_TRANSACTIONS => handleDescribeTransactionsRequest(request)
      case ApiKeys.LIST_TRANSACTIONS => handleListTransactionsRequest(request)
      case ApiKeys.ALLOCATE_PRODUCER_IDS => handleAllocateProducerIdsRequest(request)
      case ApiKeys.DESCRIBE_QUORUM => forwardToControllerOrFail(request)
      case ApiKeys.CONSUMER_GROUP_HEARTBEAT => handleConsumerGroupHeartbeat(request).exceptionally(handleError)
      case ApiKeys.CONSUMER_GROUP_DESCRIBE => handleConsumerGroupDescribe(request).exceptionally(handleError)
      case ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS => handleGetTelemetrySubscriptionsRequest(request)
      case ApiKeys.PUSH_TELEMETRY => handlePushTelemetryRequest(request)
      case ApiKeys.LIST_CLIENT_METRICS_RESOURCES => handleListClientMetricsResources(request)
      case _ => throw new IllegalStateException(s"No handler for request api key ${request.header.apiKey}")
    }
  } catch {
    case e: FatalExitError => throw e
    case e: Throwable => handleError(e)
  } finally {
    // try to complete delayed action. In order to avoid conflicting locking, the actions to complete delayed requests
    // are kept in a queue. We add the logic to check the ReplicaManager queue at the end of KafkaApis.handle() and the
    // expiration thread for certain delayed operations (e.g. DelayedJoin)
    // Delayed fetches are also completed by ReplicaFetcherThread.
    replicaManager.tryCompleteActions()
    // The local completion time may be set while processing the request. Only record it if it's unset.
    if (request.apiLocalCompleteTimeNanos < 0)
      request.apiLocalCompleteTimeNanos = time.nanoseconds
  }
}
Enter fullscreen mode Exit fullscreen mode

From the code discussed, key components are identifiable, such as the ReplicaManager, which manages replicas, the GroupCoordinator, which oversees consumer groups, the KafkaController, which operates the Controller components, and the most frequently used operations, KafkaProducer.send (to send messages) and KafkaConsumer.consume (to consume messages).

IV. AutoMQ Thread Model

1. Optimization of Processing Threads
AutoMQ, drawing inspiration from the CPU pipeline, refines Kafka's processing model into a pipeline mode, striking a balance between sequentiality and efficiency.

  • Sequentiality: Each TCP connection is tied to a single thread, with one network thread dedicated to request parsing and one RequestHandler thread responsible for processing the business logic;
  • Efficiency: The stages are pipelined, allowing a network thread to parse MSG2 immediately after finishing MSG1, without waiting for MSG1’s persistence. Similarly, once the RequestHandler completes verification and sequencing of MSG1, it can start processing MSG2 right away. To further improve persistence efficiency, AutoMQ groups data into batches for disk storage.

2. Optimization of the RequestChannel
AutoMQ has redesigned the RequestChannel into a multi-queue architecture, allowing requests from the same connection to be consistently directed to the same queue and handled by a specific KafkaRequestHandler, thus ensuring orderly processing during the verification and sequencing stages.
Each queue is directly linked to a particular KafkaRequestHandler, maintaining a one-to-one relationship.
After the Processor decodes the request, it assigns it to a specific queue based on the hash(channelId) % N formula.

References

[1] AutoMQ: https://github.com/AutoMQ/automq
[2] Kafka 3.7: https://github.com/apache/kafka/releases/tag/3.7.0
[3] JAVANIO: https://jenkov.com/tutorials/java-nio/overview.html
[4] AutoMQ Thread Optimization: [https://mp.weixin.qq.com/s/kDZJgUnMoc5K8jTuV08OJw]

💖 💪 🙅 🚩
automq
AutoMQ

Posted on June 7, 2024

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related

SQL Server Execution Plans
sql SQL Server Execution Plans

November 13, 2024

Getting Started With Apache Kafka
javascript Getting Started With Apache Kafka

August 1, 2024