-
Notifications
You must be signed in to change notification settings - Fork 496
[spark] Support parser of Spark call procedure command #2408
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
[spark] Support parser of Spark call procedure command #2408
Conversation
ff9735b to
b9d21ed
Compare
|
@wuchong @YannByron hi, Please help review when you got some time. |
| * limitations under the License. | ||
| */ | ||
|
|
||
| grammar FlussSqlExtensions; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rename to FlussSqlParser.
|
|
||
| import org.apache.spark.sql.connector.catalog.Identifier | ||
|
|
||
| trait ProcedureCatalog { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rename to SupportsProcedures to align spark SupportsNamespaces and SupportsPartitionManagement fashion.
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.fluss.spark.analysis |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move to org.apache.fluss.exception in fluss-common.
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.fluss.spark.extensions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I personally want to put this in org.apache.fluss.spark.
| * @param delegate | ||
| * The extension parser. | ||
| */ | ||
| class FlussSqlExtensionsAstBuilder(delegate: ParserInterface) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rename to FlussSqlAstBuilder and move it in org.apache.spark.sql.catalyst.parser
| * @param delegate | ||
| * The main Spark SQL parser. | ||
| */ | ||
| class FlussSparkSqlParser(delegate: ParserInterface) extends ParserInterface { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move it in org.apache.spark.sql.catalyst.parser.
| ProcedureParameterImpl(name, dataType, isRequired = false) | ||
| } | ||
|
|
||
| private case class ProcedureParameterImpl( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why we have to define a ProcedureParameter trait if only ProcedureParameterImpl extends it.
| val sparkTable = loadSparkTable(tableIdent) | ||
|
|
||
| try { | ||
| val tablePath = toTablePath(tableIdent) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this procedure do nothing for now, then throw exception, explain it will be supported soon and link a issue.
0b2788c to
ef4c33f
Compare
113ca9d to
e119c14
Compare
|
+1 |
wuchong
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, @XuQianJin-Stars!
It looks like this pull request doesn’t yet deliver a complete feature. I suggest rounding it out by implementing a few basic procedures and adding the corresponding documentation.
Additionally, please add necessary Javadoc comments for the class and its methods to improve code clarity and maintainability.
fluss-spark/PROCEDURES.md
Outdated
| @@ -0,0 +1,96 @@ | |||
| # Fluss Spark Procedures | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not the appropriate place for documentation—please move it to the website/ directory.
Specifically:
- Create a new section titled “Engine Spark” under “Engine Flink” in the documentation sidebar.
- Within “Engine Spark,” add a page named “Procedures”.
Please follow the structure and style of the Flink Procedures page as a reference. The Spark Procedures page should include, for each supported procedure:
- Syntax
- Parameters
- Return value(s)
- Example usage
Additionally, ensure that all procedure names are listed in the right-side table of contents (TOC) for easy navigation.
| import org.apache.spark.sql.connector.catalog.TableCatalog | ||
| import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} | ||
|
|
||
| class CompactProcedure(tableCatalog: TableCatalog) extends BaseProcedure(tableCatalog) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fluss doesn't support compact, and will not support it in the future. So providing an empty compact procedure looks strange to users, and will be backward in-compatible when we removing it.
Could you remove this in the PR, and introduce xxx_cluster_configs as first procedures, like Flink procedures https://fluss.apache.org/docs/next/engine-flink/procedures/#get_cluster_configs?
|
|
||
| /** Physical plan node for executing a stored procedure. */ | ||
| case class CallProcedureExec(output: Seq[Attribute], procedure: Procedure, args: Seq[Expression]) | ||
| extends SparkPlan { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems Paimon implements procedure exec by extending Spark LeafV2CommandExec which seems much simpler (not relying on RDD). Is there any reason for us to extending SparkPlan?
| * @since 0.9 | ||
| */ | ||
| @PublicEvolving | ||
| public class NoSuchProcedureException extends ApiException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ApiException is used for the communitcation between Fluss server and clients. However, NoSuchProcedureException is only a client-side exception in spark connector. I suggest to move it to package org.apache.fluss.spark.exception in fluss-spark-common module.
193b107 to
16533c1
Compare
|
@wuchong @YannByron Hi, i already updated the pr. Please help review when you got some time. |
wuchong
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@XuQianJin-Stars I make some changes to the pull request.
- renamed the parser to
FlussSqlExtensionlike how Iceberg and Paimon name it. - added the generated classes into source code, so IDE can recognize the generated classes
- improved some code and tests
cc2dfb3 to
59dc299
Compare
Purpose
Linked issue: close #2406
This PR introduces the parser and execution framework for Spark's CALL procedure command, allowing users to invoke stored procedures using SQL syntax like
CALL sys.procedure_name(args). This provides a foundation for implementing various administrative and maintenance operations.All implementations are in Scala for better integration with Spark's ecosystem.
Brief change log
Core Framework (Scala):
Proceduretrait influss-spark-common/src/main/scala/org/apache/fluss/spark/procedure/Procedure.scalaProcedureParametertrait and case class implementation for parameter definitions inProcedureParameter.scalaBaseProcedureabstract class providing common utilities inBaseProcedure.scalaProcedureBuildertrait for procedure instantiation inProcedureBuilder.scalaProcedureCatalogtrait for catalog integration incatalog/ProcedureCatalog.scalaParser & SQL Extensions:
FlussSqlExtensions.g4for CALL statement syntaxFlussSparkSqlParserextending Spark'sParserInterfaceFlussSqlExtensionsAstBuilderto convert ANTLR parse tree to logical plansOriginandCurrentOriginhandling for source position trackingfluss-spark-common/pom.xmlLogical & Physical Plans:
FlussCallStatement(unresolved) andFlussCallCommand(resolved) logical plan nodesFlussCallArgument,FlussPositionalArgument, andFlussNamedArgumentfor argument representationCallProcedureExecphysical plan node for executionAnalysis & Execution:
FlussProcedureResolveranalyzer rule for procedure resolution and validationFlussStrategyplanner strategy to injectCallProcedureExecFlussSparkSessionExtensionsto register all custom componentsCatalog Integration:
SparkCatalogto implementProcedureCatalogFlussSparkTestBaseto enable SQL extensions in test environmentProcedure Registry (Scala):
SparkProceduresobject influss-spark-common/src/main/scala/org/apache/fluss/spark/SparkProcedures.scalafor managing procedure buildersNoSuchProcedureExceptionclass inanalysis/NoSuchProcedureException.scalafor error handlingExample Implementation (Scala):
CompactProcedureinprocedure/CompactProcedure.scalaas a sample procedure (skeleton implementation)Documentation & Tests (Scala):
PROCEDURES.mddocumenting the new featureCallStatementParserTest.scalainfluss-spark-ut/src/test/scalawith comprehensive parser testsTests
Unit Tests (ScalaTest):
CallStatementParserTest: Tests parsing of CALL statementstestCallWithBackticks: Tests backtick-quoted identifierstestCallWithNamedArguments: Tests named argument syntaxtestCallWithPositionalArguments: Tests positional arguments with various data typestestCallWithMixedArguments: Tests mixed named and positional argumentstestCallSimpleProcedure: Tests simple procedure callAll existing tests in
fluss-spark-utmodule pass successfully.API and Format
New Public APIs (Scala):
Proceduretrait: Defines contract for stored proceduresProcedureParametertrait: Defines procedure parameters with companion object factory methodsProcedureCatalogtrait: Extends Spark'sTableCatalogwith procedure loading capabilityModified APIs:
SparkCatalognow implementsProcedureCatalogtraitNo changes to storage format.
Documentation
New feature introduced: Spark CALL procedure command support
Documentation added:
fluss-spark/PROCEDURES.md: Comprehensive guide on using the CALL procedure featureConfiguration required:
Users need to configure Spark session with:
spark.sql.extensions = org.apache.fluss.spark.extensions.FlussSparkSessionExtensions