• Channels

  • Contact

  • Main Site

  • More

    To see this working, head to your live site.
    1. Elastacloud Channels
    2. Modern Data
    3. Tales from the Spark Summit: Catalyst Optimisation
    Search
    Richard Conway
    Oct 24, 2017

    Tales from the Spark Summit: Catalyst Optimisation

    I'm at the Spark Summit in Dublin and have been in a session with Team Elastacloud today on Spark optimisation. It has been a pretty good session. Many of the things I knew already; several things on partitions, optimisations of queries and parallelism. All good stuff. I didn't have the chance to look at optimisation of the Catalyst engine for Spark SQL.


    I'll try and explain some of my learnings and reasonings. This explicitly is an example I did today with a few variations. The initial key to this is to ensure that you always use a LogicalPlan and that the plan itself is part of a pipeline which takes in a plan and gives you back a plan so this step effectively takes the Divide operation and changes it based on a case statement.


    import org.apache.spark.sql.catalyst.expressions.{Add, Cast, Literal, Multiply}

    import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan

    import org.apache.spark.sql.catalyst.rules.Rule

    import org.apache.spark.sql.types.IntegerType

    import org.apache.spark.sql.catalyst.expressions.Divide

    import org.apache.spark.sql.types.NullType


    Once we've done out imports we build a class which extends Rule[LogicalPlan] and this will be the rule that we add to the pipeline. So we create a singleton and test for a -1 - if our condition is triggered we want to print out a message to the console and also return a null (not sure what the use case is for this but it's illustrative rather than practical!)


    object DivideByMinusOneRule extends Rule[LogicalPlan] {

    override def apply(plan: LogicalPlan): LogicalPlan = {

    plan transformAllExpressions {

    case Divide(left,right) if right.asInstanceOf[Literal].value.asInstanceOf[Double] == -1 => {

    println("division by -1 occurred!")

    Cast(Literal(null), IntegerType)

    }

    }

    }

    }


    At this stage we add the divide rule to the catalyst optimisations.


    spark.experimental.extraOptimizations = Seq(DivideByMinusOneRule)


    When we've done this we read in a number column from our DataFrame df and divide every value by -1.


    val dfOut = df.select($"number" / -1)

    dfOut.explain()

    dfOut.show(5)


    The explain method will confirm that the extra optimisations have been added to the catalyst process pipeline. Then we can show the results and we should see that our message has been printed to the console as well as null value shown in the return.

    0 comments
    0
    • Twitter Social Icon
    • LinkedIn Social Icon
    • Facebook Social Icon

    Visit the Elastacloud website