Understand TiSpark pushdown

shiyuhang0

shiyuhang0

Posted on September 6, 2022

Understand TiSpark pushdown

Author: shiyuhang

Image description

TiSpark is a thin layer built for running Apache Spark on top of TiKV/TiFlash to answer complex OLAP queries. It supports reading, writing, and deleting from TiKV/TiFlash with the guarantee of transactions.

In order to speed up reading, TiSpark will push some operators to TiKV or TiFlash. In this article, you will learn:

  • What is pushdown in Spark
  • How Spark implements pushdown
  • TiSpark pushdown strategy

What is pushdown in Spark?

Pushdown is a classic SQL optimization that can speed up SQL queries. It pushes some operators closer to the data source as much as possible to reduce the data that the upper layer needs to process. For example, predicate pushdown will push the where condition, and aggregation pushdown will push the aggregate function.

So, what is pushdown in Spark? Does it have the same meaning as above?

First of all, let us see the internal of Spark SQL. The core of Spark SQL is the catalyst, it will:

  • Parse Spark SQL to the unresolved logical plan

  • Apply analysis rules and catalog to get the resolved logical plan

  • Apply optimization rules to get the optimized logical plan

  • Use planning strategies to get the physical plan

  • Select one of the physical plans based on the cost model

  • Generate the executable RDDs and assign them to Spark Core

Image description

In the process of catalyst, Spark SQL will be parsed as a tree. The node of the tree is called TreeNode. Spark has several classes that inherit TreeNode to represent different types of nodes in the logical plan and physical plan. For example, where condition will be parsed as filter node.

Image description

Now, let 's talk about the pushdown in Spark. There are two steps of pushdown in Spark:

  • Pushdown Optimization: first, Spark will push the operators closer to the data source when it optimizes the logical plan.

  • Pushdown to data source: then Spark will push the operators to the data source when it builds the physical plan.

As an example, consider the following SQL:

select * from A join B on A.id = B.id where A.a>10 and B.b<100;
Enter fullscreen mode Exit fullscreen mode

This SQL will be parsed as a tree in Spark. The filter is where condition, join is the join operator and scan is the data source(here it represents the table A and table B)

Image description

After the first step of pushdown, the filter will be closer to the data source to reduce the data which will be processed by join.

Image description

Then, the filter may be pushed to data source when building the physical plan. That is to say, Spark need not filter the data anymore.

Image description

How Spark implements pushdown

In the last section, you have learned that there are two steps pushdown in Spark. The first will optimize Spark SQL and the second will push to the data source.

In this section, you will learn how Spark implements the two pushdowns. The code in this section is based on Spark 3.2.

pushdown optimization

pushdown optimization will be applied to the logical plan in the optimizer phase of the catalyst. Take the predicate pushdown as an example.

The rule of predicate pushdown is called PushDownPredicates in Spark.

object PushDownPredicates extends Rule[LogicalPlan] with PredicateHelper {
  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    CombineFilters.applyLocally
      .orElse(PushPredicateThroughNonJoin.applyLocally)
      .orElse(PushPredicateThroughJoin.applyLocally)
  }
}
Enter fullscreen mode Exit fullscreen mode

The rule will be recursively applied to the tree with the transform.

For every TreeNode in the plan:

  • CombineFilters will be used to combine where condition. For example, a > 1 and a >2 will be combined to a >2

  • PushPredicateThroughNonJoin will handle the predicate without join

  • PushPredicateThroughJoin will handle the predicate with join

You can refer to the Spark source code to see the details of PushPredicateThroughNonJoin and PushPredicateThroughJoin.

Pushdown to data source

Spark may pushdown to the data source when building the physical plan based on the results of the pushdown optimization.

Whether to pushdown depends on the ability and implementation of the data source. If your data source does not support pushdowns, then you need to tell Spark does not do that.

Thus, Spark provides some interface for data sources to communicate with Spark. We also take predicate pushdown as an example. Spark provides the following interface for it:

@Evolving
public interface SupportsPushDownFilters extends ScanBuilder {
  Filter[] pushFilters(Filter[] filters);
  Filter[] pushedFilters();
}
Enter fullscreen mode Exit fullscreen mode
  • Filter[] pushFilters(Filter[] filters):the input is the result of the pushdown optimization and the output is the filters that can't be pushed to data source, which are called postScanFilters in Spark.

  • Filter[] pushedFilters():the input is empty and the output is the filters that can be pushed to data source which are called pushedFilters.

  • A filter can be both the postScanFilters and the pushedFilters. In this case, the data source and Spark will perform the filter together.

As a maintainer of Spark Data source connector, we can easily control the pushdown with the interface. But, how Spark applies the pushdown rules? There are two steps:

  1. Keep the postScanFilters by the implements of the interface. Spark will handle them later.

  2. Handle the pushedFilters with the scan operator. How it is handled depends on the implementation of the data source.

Keep the postScanFilters

The first step occurs in the optimization phase of the catalyst by V2ScanRelationPushDown. The core code is as follows (simplified):

object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper {
  import DataSourceV2Implicits._

  def apply(plan: LogicalPlan): LogicalPlan = {
    applyColumnPruning(pushDownAggregates(pushDownFilters(createScanBuilder(plan))))
  }

  private def pushDownFilters(plan: LogicalPlan) = plan.transform {
    case Filter(condition, sHolder: ScanBuilderHolder) =>
      val (pushedFilters, postScanFiltersWithoutSubquery) = PushDownUtils.pushFilters(
        sHolder.builder, normalizedFiltersWithoutSubquery)

      val filterCondition = postScanFilters.reduceLeftOption(And)
      filterCondition.map(Filter(_, sHolder)).getOrElse(sHolder)
  }
Enter fullscreen mode Exit fullscreen mode

All of the pushdown rules are applied in the apply method, including pushDownFilters which is responsible for predicate pushdown.

pushDownFilters will get postScanFilters and pushedFilters by PushDownUtils and only return postScanFilters for the final logical plan. Spark will do the filters later.

The PushDownUtils code is as follows:

object PushDownUtils extends PredicateHelper {
  def pushFilters(
      scanBuilder: ScanBuilder,
      filters: Seq[Expression]): (Seq[sources.Filter], Seq[Expression]) = {
    scanBuilder match {
      case r: SupportsPushDownFilters =>
        val postScanFilters = r.pushFilters(translatedFilters.toArray).map { filter =>
          DataSourceStrategy.rebuildExpressionFromFilter(filter, translatedFilterToExpr)
        }
        (r.pushedFilters(), (untranslatableExprs ++ postScanFilters).toSeq)

      case _ => (Nil, filters)
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

PushDownUtils will match the SupportsPushDownFilters and get pushedFilters and postScanFilters by the implementation of the data source. pushedFilters will be empty once the implementation is empty, which means no predicate will be pushdown.

Handle the pushedFilters

Let us take JDBC data source as an example:

The build method in JDBCScanBuilder will return JDBCScan with the input pushedFilter.

override def build(): Scan = {
  JDBCScan(JDBCRelation(schema, parts, jdbcOptions)(session), finalSchema, pushedFilter,
    pushedAggregateList, pushedGroupByCols)
}
Enter fullscreen mode Exit fullscreen mode

JDBCScan will call relation.buildScan in the toV1TableScan method and return JDBCRDD

case class JDBCScan(
    relation: JDBCRelation,
    prunedSchema: StructType,
    pushedFilters: Array[Filter],
    pushedAggregateColumn: Array[String] = Array(),
    groupByColumns: Option[Array[String]]) extends V1Scan {
  override def toV1TableScan[T <: BaseRelation with TableScan](context: SQLContext): T = {
    new BaseRelation with TableScan {
      override def buildScan(): RDD[Row] = {
        val columnList = if (groupByColumns.isEmpty) {
          prunedSchema.map(_.name).toArray
        } else {
          pushedAggregateColumn
        }
        relation.buildScan(columnList, prunedSchema, pushedFilters, groupByColumns)
      }
    }.asInstanceOf[T]
  }
}
Enter fullscreen mode Exit fullscreen mode

Filter will be parsed as where condition and saved into filterWhereClause in JDBCRDD. Then a complete SQL with where condition will request the data sources which are compatible with the MySQL protocol.

private[jdbc] class JDBCRDD(
    sc: SparkContext,
    getConnection: () => Connection,
    schema: StructType,
    columns: Array[String],
    filters: Array[Filter],
    partitions: Array[Partition],
    url: String,
    options: JDBCOptions,
    groupByColumns: Option[Array[String]])
  extends RDD[InternalRow](sc, Nil) {

  private val filterWhereClause: String =
    filters
      .flatMap(JDBCRDD.compileFilter(_, JdbcDialects.get(url)))
      .map(p => s"($p)").mkString(" AND ")

}
Enter fullscreen mode Exit fullscreen mode

Then the implementation of JDBC data source will be applied to Spark in the planner phase by DataSourceV2Strategy. The simplified core code is as follows:

class DataSourceV2Strategy(session: SparkSession) extends Strategy with PredicateHelper {

  override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
    case PhysicalOperation(project, filters,
        DataSourceV2ScanRelation(_, V1ScanWrapper(scan, pushed, aggregate), output)) =>
      val v1Relation = scan.toV1TableScan[BaseRelation with TableScan](session.sqlContext)
      val rdd = v1Relation.buildScan()
      val unsafeRowRDD = DataSourceStrategy.toCatalystRDD(v1Relation, output, rdd)
      val dsScan = RowDataSourceScanExec(
        output,
        output.toStructType,
        Set.empty,
        pushed.toSet,
        aggregate,
        unsafeRowRDD,
        v1Relation,
        tableIdentifier = None)
      withProjectAndFilter(project, filters, dsScan, needsUnsafeConversion = false)

    case PhysicalOperation(project, filters, DataSourceV2ScanRelation(_, scan: LocalScan, output))

    case PhysicalOperation(project, filters, relation: DataSourceV2ScanRelation)

    case PhysicalOperation(p, f, r: StreamingDataSourceV2Relation)

    case PhysicalOperation(p, f, r: StreamingDataSourceV2Relation)
Enter fullscreen mode Exit fullscreen mode

Spark will match PhysicalOperation in the strategy. As for the JDBC data source, it will go into the first case because it is a V1Scan.
In the first case, scan.toV1TableScan is called to get the JDBCRDD which is introduced above. Then RowDataSourceScanExec will perform the data fetch with the JDBCRDD. Finally, withProjectAndFilter will put RowDataSourceScanExec into the entire physical plan. Spark will first execute the scan with filter and then execute the other operators.

TiSpark pushdown strategy

The strategy in TiSpark

TiSpark is a connector of Spark that provides TiKV data source. So, we can push some operators from Spark to TiKV according to the discussion above.

The question is: what is the strategy of pushdown in TiSpark?

We need to support pushdown in Spark >= 2.4 and meet the following strategies.

First, an operator can not be pushed down when the data source can not handle the operator. For predicate, TiKV can not support every expression and data type. So, TiSpark needs to exclude them automatically.

Second, we may not want to pushdown to reduce the pressure of TiKV. It will help when your Spark resources are abundant and TiKV resources are scarce. TiSpark provides some configs for this:

  • spark.tispark.plan.allow_agg_pushdown: you can refuse the aggregation pushdown by this config.

  • spark.tispark.plan.unsupported_pushdown_exprs: you can specify unsupported expressions to refuse them to pushdown. It will also help you work with the old version of TiKV which may not support some of the expressions.

How TiSpark implements the strategy

Next, we will learn how TiSpark implements the strategy.

We have introduced the pushdown interface provided by Spark. However, it can not meet the strategy of TiSpark. Here are some questions about the pushdown interface:

  • Poor expansion ability: The design of the pushdown interface is not good enough to expand in Data Source API V1 (DSV1). This means it is difficult to support various pushdowns in DSV1.

  • The limited ability of pushdown: DSV2 improves the pushdown interface and solves the expansion issues. But the ability of pushdown is limited. Spark 3.0 only supports predicate pushdown and column prune pushdown. Spark 3.1 add the aggregation pushdown and Spark 3.2 add the limit pushdown.

  • Inflexible pushdown strategy: For example, aggregation pushdown does not support push avg which can be converted as sum/count in DSV2

TiSpark needs support common pushdowns in every supported spark version. Thus, the pushdown interface is not suitable. What to do next? The answer lies in catalyst extension.

The catalyst extension is supported after Spark 2.2. We can inject custom rules and strategies into most of the phases of the catalyst. In other words, we can inject TiSpark pushdown strategies to control the pushdown precisely.

Spark will pushdown to the data source in the planner phase. The corresponding extension interface is injectPlannerStrategy(based on Spark 3.2.1 and TiSpark 3.0.1)

def injectPlannerStrategy(builder: StrategyBuilder): Unit = {
  plannerStrategyBuilders += builder
}
Enter fullscreen mode Exit fullscreen mode

TiSpark needs to implement the interface:

ReflectionUtil will return the TiStrategy according to the different spark versions by reflection in scala. This can avoid compatibility issues caused by different spark versions.

e.injectPlannerStrategy(new TiStrategyFactory(getOrCreateTiContext))

class TiStrategyFactory(getOrCreateTiContext: SparkSession => TiContext)
    extends (SparkSession => Strategy) {
  override def apply(sparkSession: SparkSession): Strategy = {
    TiExtensions.validateCatalog(sparkSession)
    ReflectionUtil.newTiStrategy(getOrCreateTiContext, sparkSession)
  }
}
Enter fullscreen mode Exit fullscreen mode

TiStrategy is the core of pushdowns. It will match the TiDBtable which represents the TiDB data source and then execute thedoPlan. If the match fails, TiSpark will do nothing to avoid affecting other data sources.

ase class TiStrategy(getOrCreateTiContext: SparkSession => TiContext)(sparkSession: SparkSession)
    extends Strategy with Logging {

  override def apply(plan: LogicalPlan): Seq[SparkPlan] = {
    plan
      .collectFirst {
        case DataSourceV2ScanRelation(DataSourceV2Relation(table: TiDBTable, _, _, _, _), _, _) =>
          doPlan(table, plan)
      }
      .toSeq
      .flatten
  }
Enter fullscreen mode Exit fullscreen mode

The strategies in doPlan imitate the strategies in DataSourceV2Strategy. TiSpark will identify the operators which can be pushed to the data source with pattern match. Then request TiKV based on these operators. Let us take predicate pushdown as an example. TiSpark will match the PhysicalOperation and execute the pruneFilterProject method.

private def doPlan(source: TiDBTable, plan: LogicalPlan): Seq[SparkPlan] =
  plan match {
    case PhysicalOperation(
          projectList,
          filters,
          DataSourceV2ScanRelation(
            DataSourceV2Relation(source: TiDBTable, _, _, _, _),
            _,
            _)) =>
      pruneFilterProject(projectList, filters, source, newTiDAGRequest()) :: Nil

    case TiAggregation(
          groupingExpressions,
          aggregateExpressions,
          resultExpressions,
          TiAggregationProjectionV2(filters, _, `source`, projects))
        if isValidAggregates(groupingExpressions, aggregateExpressions, filters, source) =>
    case _ => Nil
  }
Enter fullscreen mode Exit fullscreen mode

The pruneFilterProject method will:

  1. Convert Spark Filter expression to TiKV Filter expression with TiExprUtils.isSupportedFilter. TiSpark will also judge if the expression can be pushed in the method. The Filters that can be pushed will be put into pushdownFilters, and those that cannot be pushed will be put into the residualFilter

  2. The DAGRequest, which is the parameter in the request to TiKV will be built by filterToDAGRequest. The pushdownFilters will be put into the DAGRequest. Then, A scan that can get TiKV data will be generated by toCoprocessorRDD

  3. The scan will be wrapped and executed by FilterExec. Meanwhile, we need to apply the residualFilter back to the Spark.

private def pruneFilterProject(
    projectList: Seq[NamedExpression],
    filterPredicates: Seq[Expression],
    source: TiDBTable,
    dagRequest: TiDAGRequest): SparkPlan = {

  val (pushdownFilters: Seq[Expression], residualFilters: Seq[Expression]) =
    filterPredicates.partition((expression: Expression) =>
      TiExprUtils.isSupportedFilter(expression, source, blocklist))

  val residualFilter: Option[Expression] =
    residualFilters.reduceLeftOption(catalyst.expressions.And)

  filterToDAGRequest(tiColumns, pushdownFilters, source, dagRequest)

  val scan = toCoprocessorRDD(source, projectSeq, dagRequest)
  residualFilter.fold(scan)(FilterExec(_, scan))
}
Enter fullscreen mode Exit fullscreen mode

In this way, TiSpark can support most of the pushdown in every spark version (>=2.4).

So far, TiSpark supports predicate pushdown, aggregation pushdown, limit pushdown, order by pushdown, and projection pushdown. And TiSpark can control whether a specific expression or data type can be pushed down.

Conclusion

TiSpark support pushdown by catalyst extension which brings several problems:

  • The increase in code complexity

  • Unstable for we may touch the evolving interface or method in Spark

  • We need to be very careful to avoid affecting the original Spark logical

Spark is focusing on the development of DataSource API as well as the pushdown interface. Hope that in the near future DataSource API will be strong enough to meet the needs of TiSpark. At that time, we will be happy to transfer pushdown to DataSource API.

Appendix

The support pushdown in TiSpark is as follows

Data Type sum count avg min max predicate & order by & group by
BIT
BOOLEAN
TINYINT
SMALLINT
MEDIUMINT
INTEGER
BIGINT
FLOAT
DOUBLE
DECIMAL
DATE
DATETIME
TIMESTAMP
TIME
YEAR
CHAR
VARCHAR
TINYTEXT
TEXT
MEDIUMTEXT
LONGTEXT
BINARY
VARBINARY
TINYBLOB
BLOB
MEDIUMBLOB
LONGBLOB
ENUM
SET
  • Pushdown min/max(time) leads to a wrong result
  • Pushdown min/max(set) may cause TiKV panic

You can judge if an operator is pushdown by explain in TiSpark. Here is an example:

1.Create a table in TiDB

CREATE TABLE `test`.`t`  (
  `id` int(11) NOT NULL,
  PRIMARY KEY (`id`)
);
Enter fullscreen mode Exit fullscreen mode

2.Execute Spark SQL with explan

spark.sql("select avg(id) from test.t where id > 10").explain
Enter fullscreen mode Exit fullscreen mode

3.You will get the execute plan

*(2) HashAggregate(keys=[], functions=[specialsum(specialsum(id#252L, DecimalType(38,0), null)#258, DecimalType(38,0), null), specialsum(count(id#252L)#259L, LongType, 0)])
+- Exchange SinglePartition, true, [id=#38]
   +- *(1) HashAggregate(keys=[], functions=[partial_specialsum(specialsum(id#252L, DecimalType(38,0), null)#258, DecimalType(38,0), null), partial_specialsum(count(id#252L)#259L, LongType, 0)])
      +- *(1) ColumnarToRow
         +- TiKV CoprocessorRDD{[table: t] IndexReader, Columns: id@LONG: { IndexRangeScan(Index:primary(id)): { RangeFilter: [[id@LONG GREATER_THAN 10]], Range: [([t\200\000\000\000\000\000\000o_i\200\000\000\000\000\000\000\001\003\200\000\000\000\000\000\000\v], [t\200\000\000\000\000\000\000o_i\200\000\000\000\000\000\000\001\372])] }, Aggregates: Sum(id@LONG), Count(id@LONG) }, startTs: 434873744501506049}
Enter fullscreen mode Exit fullscreen mode

Focus on TiKV CoprocessorRDD

  • RangeFilter: [[id@LONG GREATER_THAN 10]]: indicates that id>10 is pushed down

  • Aggregates: Sum(id@LONG), Count(id@LONG): indicates that Sum and Count are pushed down, they will be rewritten into avg in Spark.

💖 💪 🙅 🚩
shiyuhang0
shiyuhang0

Posted on September 6, 2022

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related

Understand TiSpark pushdown
tispark Understand TiSpark pushdown

September 6, 2022