How does ShardingSphere’s Show processlist & Kill Work?

apache_shardingsphere

Apache ShardingSphere

Posted on September 22, 2022

How does ShardingSphere’s Show processlist & Kill Work?

For those of you who often use databases, you may wonder:

  1. How do I check what SQL is currently being executed by the database, and in what state?

  2. How do I terminate abnormal SQL? For instance, if a SELECT statement used to query a table with massive data does not carry query conditions, it would drag down the performance of the entire database. This may push to want to terminate this abnormal SQL.

In response to the above issues, Apache ShardingSphere introduced functions such as Show processlist and Kill <processID>.

Image description

1. Introduction

Show processlist: this command can display the list of SQL currently being executed by ShardingSphere and the execution progress of each SQL. If ShardingSphere is deployed in cluster mode, the Show processlist function aggregates the SQL running for all Proxy instances in the cluster and then displays the result, so you can always see all the SQL running at that moment.

mysql> show processlist \G;
*************************** 1. row ***************************
     Id: 82a67f254959e0a0807a00f3cd695d87
   User: root
   Host: 10.200.79.156
     db: root
Command: Execute
   Time: 19
  State: Executing 0/1
   Info: update t_order set version = 456
1 row in set (0.24 sec)
Enter fullscreen mode Exit fullscreen mode

Kill <processID>: This command is implemented based on Show processlist and can terminate the running SQL listed in the Show processlist.

mysql> kill 82a67f254959e0a0807a00f3cd695d87;
Query OK, 0 rows affected (0.17 sec)
Enter fullscreen mode Exit fullscreen mode

2. How do they work?

Now that you understand the functions of Show processlist and Kill <processID>, let's see how the two commands work. As the working principle behind Kill <processID> is similar to that of Show processlist, we'll focus on the interpretation of Show processlist.

2.1 How is SQL saved and destroyed?

Each SQL executed in ShardingSphere will generate an ExecutionGroupContext object. The object contains all the information about this SQL, among which there is an executionID field to ensure its uniqueness.

When ShardingSphere receives a SQL command, the GovernanceExecuteProcessReporter# report is called to store ExecutionGroupContext information into the cache of ConcurrentHashMap (currently only DML and DDL statements of MySQL are supported; other types of databases will be supported in later versions. Query statements are also classified into DML).

public final class GovernanceExecuteProcessReporter implements ExecuteProcessReporter {

    @Override
    public void report(final QueryContext queryContext, final ExecutionGroupContext<? extends SQLExecutionUnit> executionGroupContext,
                       final ExecuteProcessConstants constants, final EventBusContext eventBusContext) {
        ExecuteProcessContext executeProcessContext = new ExecuteProcessContext(queryContext.getSql(), executionGroupContext, constants);
        ShowProcessListManager.getInstance().putProcessContext(executeProcessContext.getExecutionID(), executeProcessContext);
        ShowProcessListManager.getInstance().putProcessStatement(executeProcessContext.getExecutionID(), executeProcessContext.getProcessStatements());
    }
}
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class ShowProcessListManager {

    private static final ShowProcessListManager INSTANCE = new ShowProcessListManager();

    @Getter
    private final Map<String, ExecuteProcessContext> processContexts = new ConcurrentHashMap<>();

    @Getter
    private final Map<String, Collection<Statement>> processStatements = new ConcurrentHashMap<>();

    public static ShowProcessListManager getInstance() {
        return INSTANCE;
    }

    public void putProcessContext(final String executionId, final ExecuteProcessContext processContext) {
        processContexts.put(executionId, processContext);
    }

    public void putProcessStatement(final String executionId, final Collection<Statement> statements) {
        if (statements.isEmpty()) {
            return;
        }
        processStatements.put(executionId, statements);
    }
}
Enter fullscreen mode Exit fullscreen mode

As shown above, the ShowProcessListManager class has two cache Maps, namely processContexts and processStatements. The former stores the mapping between executionID and ExecuteProcessContext.

The latter contains the mapping between executionID and Statement objects that may generate multiple statements after the SQL is overwritten.

Every time ShardingSphere receives a SQL statement, the SQL information will be cached into the two Maps. After SQL is executed, the cache of Map will be deleted.

@RequiredArgsConstructor
public final class ProxyJDBCExecutor {

    private final String type;

    private final ConnectionSession connectionSession;

    private final JDBCDatabaseCommunicationEngine databaseCommunicationEngine;

    private final JDBCExecutor jdbcExecutor;

    public List<ExecuteResult> execute(final QueryContext queryContext, final ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext,
                                       final boolean isReturnGeneratedKeys, final boolean isExceptionThrown) throws SQLException {
        try {
            MetaDataContexts metaDataContexts = ProxyContext.getInstance().getContextManager().getMetaDataContexts();
            EventBusContext eventBusContext = ProxyContext.getInstance().getContextManager().getInstanceContext().getEventBusContext();
            ShardingSphereDatabase database = metaDataContexts.getMetaData().getDatabase(connectionSession.getDatabaseName());
            DatabaseType protocolType = database.getProtocolType();
            DatabaseType databaseType = database.getResource().getDatabaseType();
            ExecuteProcessEngine.initialize(queryContext, executionGroupContext, eventBusContext);
            SQLStatementContext<?> context = queryContext.getSqlStatementContext();
            List<ExecuteResult> result = jdbcExecutor.execute(executionGroupContext,
                    ProxyJDBCExecutorCallbackFactory.newInstance(type, protocolType, databaseType, context.getSqlStatement(), databaseCommunicationEngine, isReturnGeneratedKeys, isExceptionThrown,
                            true),
                    ProxyJDBCExecutorCallbackFactory.newInstance(type, protocolType, databaseType, context.getSqlStatement(), databaseCommunicationEngine, isReturnGeneratedKeys, isExceptionThrown,
                            false));
            ExecuteProcessEngine.finish(executionGroupContext.getExecutionID(), eventBusContext);
            return result;
        } finally {
            ExecuteProcessEngine.clean();
        }
    }
Enter fullscreen mode Exit fullscreen mode

As shown above, ExecuteProcessEngine.initialize(queryContext, executionGroupContext, eventBusContext); will store the SQL information in the two cache Maps. Finally, ExecuteProcessEngine.clean(); in the code block will clear up the Map in the cache.

The SQL shown in the Show processlist was obtained from processContexts. But this Map is just a local cache. If ShardingSphere is deployed in cluster mode, how does Show processlist obtain SQL running on other machines in the cluster? Let's see how ShardingSphere handles it.

2.2 How does Show processlist work?

When ShardingSphere receives the Show process command, it is sent to the executor ShowProcessListExecutor#execute for processing. The implementation of the getQueryResult() is the focus.

public final class ShowProcessListExecutor implements DatabaseAdminQueryExecutor {

    private Collection<String> batchProcessContexts;

    @Getter
    private QueryResultMetaData queryResultMetaData;

    @Getter
    private MergedResult mergedResult;

    public ShowProcessListExecutor() {
        ProxyContext.getInstance().getContextManager().getInstanceContext().getEventBusContext().register(this);
    }

    @Subscribe
    public void receiveProcessListData(final ShowProcessListResponseEvent event) {
        batchProcessContexts = event.getBatchProcessContexts();
    }

    @Override
    public void execute(final ConnectionSession connectionSession) {
        queryResultMetaData = createQueryResultMetaData();
        mergedResult = new TransparentMergedResult(getQueryResult());
    }

    private QueryResult getQueryResult() {
        ProxyContext.getInstance().getContextManager().getInstanceContext().getEventBusContext().post(new ShowProcessListRequestEvent());
        if (null == batchProcessContexts || batchProcessContexts.isEmpty()) {
            return new RawMemoryQueryResult(queryResultMetaData, Collections.emptyList());
        }
        Collection<YamlExecuteProcessContext> processContexts = new LinkedList<>();
        for (String each : batchProcessContexts) {
            processContexts.addAll(YamlEngine.unmarshal(each, BatchYamlExecuteProcessContext.class).getContexts());
        }
        List<MemoryQueryResultDataRow> rows = processContexts.stream().map(processContext -> {
            List<Object> rowValues = new ArrayList<>(8);
            rowValues.add(processContext.getExecutionID());
            rowValues.add(processContext.getUsername());
            rowValues.add(processContext.getHostname());
            rowValues.add(processContext.getDatabaseName());
            rowValues.add("Execute");
            rowValues.add(TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - processContext.getStartTimeMillis()));
            int processDoneCount = processContext.getUnitStatuses().stream().map(each -> ExecuteProcessConstants.EXECUTE_STATUS_DONE == each.getStatus() ? 1 : 0).reduce(0, Integer::sum);
            String statePrefix = "Executing ";
            rowValues.add(statePrefix + processDoneCount + "/" + processContext.getUnitStatuses().size());
            String sql = processContext.getSql();
            if (null != sql && sql.length() > 100) {
                sql = sql.substring(0, 100);
            }
            rowValues.add(null != sql ? sql : "");
            return new MemoryQueryResultDataRow(rowValues);
        }).collect(Collectors.toList());
        return new RawMemoryQueryResult(queryResultMetaData, rows);
    }

    private QueryResultMetaData createQueryResultMetaData() {
        List<RawQueryResultColumnMetaData> columns = new ArrayList<>();
        columns.add(new RawQueryResultColumnMetaData("", "Id", "Id", Types.VARCHAR, "VARCHAR", 20, 0));
        columns.add(new RawQueryResultColumnMetaData("", "User", "User", Types.VARCHAR, "VARCHAR", 20, 0));
        columns.add(new RawQueryResultColumnMetaData("", "Host", "Host", Types.VARCHAR, "VARCHAR", 64, 0));
        columns.add(new RawQueryResultColumnMetaData("", "db", "db", Types.VARCHAR, "VARCHAR", 64, 0));
        columns.add(new RawQueryResultColumnMetaData("", "Command", "Command", Types.VARCHAR, "VARCHAR", 64, 0));
        columns.add(new RawQueryResultColumnMetaData("", "Time", "Time", Types.VARCHAR, "VARCHAR", 10, 0));
        columns.add(new RawQueryResultColumnMetaData("", "State", "State", Types.VARCHAR, "VARCHAR", 64, 0));
        columns.add(new RawQueryResultColumnMetaData("", "Info", "Info", Types.VARCHAR, "VARCHAR", 120, 0));
        return new RawQueryResultMetaData(columns);
    }
}
Enter fullscreen mode Exit fullscreen mode

You’ll use the guava package's EventBus function, which is an information publish/subscribe database that is an elegant implementation of the Observer pattern. EventBus decouples classes from each other, and you'll find out more about it below.

getQueryResult() method will post ShowProcessListRequestEvent. ProcessRegistrySubscriber#loadShowProcessListData uses the @Subscribe annotations to subscribe to the event.

This method is the core to implementing Show processlist. Next, we'll introduce specific procedures of this method.

public final class ProcessRegistrySubscriber {    
    @Subscribe
    public void loadShowProcessListData(final ShowProcessListRequestEvent event) {
        String processListId = new UUID(ThreadLocalRandom.current().nextLong(), ThreadLocalRandom.current().nextLong()).toString().replace("-", "");
        boolean triggerIsComplete = false;
        // 1. Obtain the Process List path of all existing proxy nodes in cluster mode
        Collection<String> triggerPaths = getTriggerPaths(processListId);
        try {
            // 2. Iterate through the path and write an empty string to the node, to trigger the node monitoring.
            triggerPaths.forEach(each -> repository.persist(each, ""));
            // 3. Lock and wait 5 seconds for each node to write the information of currently running SQL to the persistence layer. 
            triggerIsComplete = waitAllNodeDataReady(processListId, triggerPaths);
            // 4. Fetch and aggregate the data written by each proxy node from the persistence layer. Then EventBus will post a ShowProcessListResponseEvent command, which means the operation is completed.
            sendShowProcessList(processListId);
        } finally {
            // 5. Delete resources
            repository.delete(ProcessNode.getProcessListIdPath(processListId));
            if (!triggerIsComplete) {
                triggerPaths.forEach(repository::delete);
            }
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

It contains five steps and steps 2 & 3 are the focus.

2.2.1 Step 2: the cluster obtains the data implementation

In this step, an empty string will be written to the node /nodes/compute_nodes/process_trigger/<instanceId>:<processlistId>, which will trigger ShardingSphere's monitoring logic.

When ShardingSphere is started, the persistence layer will watch to monitor a series of path changes, such as the addition, deletion, and modification operations of the path /nodes/compute_nodes.

However, monitoring is an asynchronous process and the main thread does not block, so step 3 is required to lock and wait for each ShardingSphere node to write its currently running SQL information into the persistence layer.

Let's take a look at how the ShardingSphere handles the monitoring logic.

public final class ComputeNodeStateChangedWatcher implements GovernanceWatcher<GovernanceEvent> {

    @Override
    public Collection<String> getWatchingKeys(final String databaseName) {
        return Collections.singleton(ComputeNode.getComputeNodePath());
    }

    @Override
    public Collection<Type> getWatchingTypes() {
        return Arrays.asList(Type.ADDED, Type.UPDATED, Type.DELETED);
    }

    @SuppressWarnings("unchecked")
    @Override
    public Optional<GovernanceEvent> createGovernanceEvent(final DataChangedEvent event) {
        String instanceId = ComputeNode.getInstanceIdByComputeNode(event.getKey());
        if (!Strings.isNullOrEmpty(instanceId)) {
            ...
        } else if (event.getKey().startsWith(ComputeNode.getOnlineInstanceNodePath())) {
            return createInstanceEvent(event);
            // show processlist
        } else if (event.getKey().startsWith(ComputeNode.getProcessTriggerNodePatch())) {
            return createShowProcessListTriggerEvent(event);
            // kill processlistId
        } else if (event.getKey().startsWith(ComputeNode.getProcessKillNodePatch())) {
            return createKillProcessListIdEvent(event);
        }
        return Optional.empty();
    }


    private Optional<GovernanceEvent> createShowProcessListTriggerEvent(final DataChangedEvent event) {
        Matcher matcher = getShowProcessTriggerMatcher(event);
        if (!matcher.find()) {
            return Optional.empty();
        }
        if (Type.ADDED == event.getType()) {
            return Optional.of(new ShowProcessListTriggerEvent(matcher.group(1), matcher.group(2)));
        }
        if (Type.DELETED == event.getType()) {
            return Optional.of(new ShowProcessListUnitCompleteEvent(matcher.group(2)));
        }
        return Optional.empty();
    }
}
Enter fullscreen mode Exit fullscreen mode

After ComputeNodeStateChangedWatcher#createGovernanceEvent monitored the information, it would distinguish which event to create according to the path.

As shown in the above code, it is a new node, so ShowProcessListTriggerEvent will be posted. As each ShardingSphere instance will monitor /nodes/compute_nodes, each instance will process ShowProcessListTriggerEvent.

In this case, single-machine processing is transformed into cluster processing. Let's look at how ShardingSphere handles it.

public final class ClusterContextManagerCoordinator {
    @Subscribe
    public synchronized void triggerShowProcessList(final ShowProcessListTriggerEvent event) {
        if (!event.getInstanceId().equals(contextManager.getInstanceContext().getInstance().getMetaData().getId())) {
            return;
        }
        Collection<ExecuteProcessContext> processContexts = ShowProcessListManager.getInstance().getAllProcessContext();
        if (!processContexts.isEmpty()) {
            registryCenter.getRepository().persist(ProcessNode.getProcessListInstancePath(event.getProcessListId(), event.getInstanceId()),
                    YamlEngine.marshal(new BatchYamlExecuteProcessContext(processContexts)));
        }
        registryCenter.getRepository().delete(ComputeNode.getProcessTriggerInstanceIdNodePath(event.getInstanceId(), event.getProcessListId()));
    }
}
Enter fullscreen mode Exit fullscreen mode

ClusterContextManagerCoordinator#triggerShowProcessList will subscribe to ShowProcessListTriggerEvent, in which processContext data is processed by itself. ShowProcessListManager.getInstance().getAllProcessContext() retrieves the processContext that is currently running (here the data refers to the SQL information that ShardingSphere stores in the Map before each SQL execution, which is described at the beginning of the article) and transfers it to the persistence layer. If the /nodes/compute_nodes/process_trigger/<instanceId>:<processlistId> node is deleted, the processing is completed.

When you delete the node, monitoring will also be triggered and ShowProcessListUnitCompleteEvent will be posted. This event will finally awake the pending lock.

public final class ClusterContextManagerCoordinator {

    @Subscribe
    public synchronized void completeUnitShowProcessList(final ShowProcessListUnitCompleteEvent event) {
        ShowProcessListSimpleLock simpleLock = ShowProcessListManager.getInstance().getLocks().get(event.getProcessListId());
        if (null != simpleLock) {
            simpleLock.doNotify();
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

2.2.2 Step 3: lock and wait for the data implementation

ShardingSphere uses the isReady(Paths) method to determine whether all instances have been processed. It returns true only when all instances have been processed.

There is a maximum waiting time of 5 seconds for data processing. If the processing is not completed in 5 seconds, then false is returned.

`public final class ClusterContextManagerCoordinator {

@Subscribe
public synchronized void completeUnitShowProcessList(final ShowProcessListUnitCompleteEvent event) {
    ShowProcessListSimpleLock simpleLock = ShowProcessListManager.getInstance().getLocks().get(event.getProcessListId());
    if (null != simpleLock) {
        simpleLock.doNotify();
    }
}
Enter fullscreen mode Exit fullscreen mode

}`

2.2.3 Aggregate the processList data and return it

After each instance processed the data, the instance that received the Show processlist command needs to aggregate the data and then display the result.

public final class ProcessRegistrySubscriber {  

    private void sendShowProcessList(final String processListId) {
        List<String> childrenKeys = repository.getChildrenKeys(ProcessNode.getProcessListIdPath(processListId));
        Collection<String> batchProcessContexts = new LinkedList<>();
        for (String each : childrenKeys) {
            batchProcessContexts.add(repository.get(ProcessNode.getProcessListInstancePath(processListId, each)));
        }
        eventBusContext.post(new ShowProcessListResponseEvent(batchProcessContexts));
    }
}
Enter fullscreen mode Exit fullscreen mode

ProcessRegistrySubscriber#sendShowProcessList will aggregate the running SQL data into batchProcessContexts, and then post ShowProcessListResponseEvent.

This event will be consumed by ShowProcessListExecutor#receiveProcessListData, and the getQueryResult() method will proceed to show the queryResult.

So far, we’ve completed the execution process of Show processlist command.

2.3 How does Kill work?

Kill <processId> shares a similar logic with Show processlist, that is to combine EventBus with the watch mechanism.

Since we do not know which SQL the processId belongs to, it is also necessary to add empty nodes for each instance.

Through the watch mechanism, each ShardingSphere instance watches to the new node and checks whether the processId key is in the cache Map. If yes, fetch the value corresponding to the key.

The value is a Collection<Statement> collection. Then you only have to iterate through the Statement collection and call statement.cancel() in turn. The underlying layer is java.sql.Statement#cancel() method called to cancel SQL execution.

3. Conclusion

Currently, Apache ShardingSphere can only implement the Show processlist and Kill <processId> functions for MySQL dialects.

Once you get to know how they work, and if you’re interested, you‘re welcome to participate in the development of related functions. Our community is very open and anyone who is interested in contributing to open source code is welcome.

Relevant Links:

Apache ShardingSphere Official Website

Apache ShardingSphere GitHub

Apache ShardingSphere Slack Channel

Author

Xu Yang, a middleware R&D engineer at Servyou Group. Responsible for the table and database sharding with massive data. An open source enthusiast and ShardingSphere contributor. Currently, he’s interested in developing the kernel module of the ShardingSphere project.

💖 💪 🙅 🚩
apache_shardingsphere
Apache ShardingSphere

Posted on September 22, 2022

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related