diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundJdbcTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundJdbcTableSink.java new file mode 100644 index 00000000000000..53367cf9c21ae6 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundJdbcTableSink.java @@ -0,0 +1,84 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.analyzer; + +import org.apache.doris.nereids.memo.GroupExpression; +import org.apache.doris.nereids.properties.LogicalProperties; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.PlanType; +import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType; +import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; + +import java.util.List; +import java.util.Optional; + +/** + * Represent an jdbc table sink plan node that has not been bound. + */ +public class UnboundJdbcTableSink extends UnboundBaseExternalTableSink { + + public UnboundJdbcTableSink(List nameParts, List colNames, List hints, + List partitions, CHILD_TYPE child) { + this(nameParts, colNames, hints, partitions, DMLCommandType.NONE, + Optional.empty(), Optional.empty(), child); + } + + /** + * constructor + */ + public UnboundJdbcTableSink(List nameParts, + List colNames, + List hints, + List partitions, + DMLCommandType dmlCommandType, + Optional groupExpression, + Optional logicalProperties, + CHILD_TYPE child) { + super(nameParts, PlanType.LOGICAL_UNBOUND_JDBC_TABLE_SINK, ImmutableList.of(), groupExpression, + logicalProperties, colNames, dmlCommandType, child, hints, partitions); + } + + @Override + public Plan withChildren(List children) { + Preconditions.checkArgument(children.size() == 1, + "UnboundJdbcTableSink should have exactly one child"); + return new UnboundJdbcTableSink<>(nameParts, colNames, hints, partitions, + dmlCommandType, Optional.empty(), Optional.empty(), children.get(0)); + } + + @Override + public R accept(PlanVisitor visitor, C context) { + return visitor.visitUnboundJdbcTableSink(this, context); + } + + @Override + public Plan withGroupExpression(Optional groupExpression) { + return new UnboundJdbcTableSink<>(nameParts, colNames, hints, partitions, + dmlCommandType, groupExpression, Optional.of(getLogicalProperties()), child()); + } + + @Override + public Plan withGroupExprLogicalPropChildren(Optional groupExpression, + Optional logicalProperties, List children) { + return new UnboundJdbcTableSink<>(nameParts, colNames, hints, partitions, + dmlCommandType, groupExpression, logicalProperties, children.get(0)); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSinkCreator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSinkCreator.java index 6c361c36f055a3..8ca58f977578a6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSinkCreator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSinkCreator.java @@ -23,6 +23,7 @@ import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.datasource.hive.HMSExternalCatalog; import org.apache.doris.datasource.iceberg.IcebergExternalCatalog; +import org.apache.doris.datasource.jdbc.JdbcExternalCatalog; import org.apache.doris.nereids.exceptions.AnalysisException; import org.apache.doris.nereids.exceptions.ParseException; import org.apache.doris.nereids.trees.plans.Plan; @@ -78,6 +79,9 @@ public static LogicalSink createUnboundTableSink(List na } else if (curCatalog instanceof IcebergExternalCatalog) { return new UnboundIcebergTableSink<>(nameParts, colNames, hints, partitions, dmlCommandType, Optional.empty(), Optional.empty(), plan); + } else if (curCatalog instanceof JdbcExternalCatalog) { + return new UnboundJdbcTableSink<>(nameParts, colNames, hints, partitions, + dmlCommandType, Optional.empty(), Optional.empty(), plan); } throw new RuntimeException("Load data to " + curCatalog.getClass().getSimpleName() + " is not supported."); } @@ -109,20 +113,16 @@ public static LogicalSink createUnboundTableSinkMaybeOverwrite(L dmlCommandType, Optional.empty(), Optional.empty(), plan); } else if (curCatalog instanceof IcebergExternalCatalog && !isAutoDetectPartition) { return new UnboundIcebergTableSink<>(nameParts, colNames, hints, partitions, - dmlCommandType, Optional.empty(), Optional.empty(), plan); - } - // TODO: we need to support insert into other catalog - try { - if (ConnectContext.get() != null) { - ConnectContext.get().getSessionVariable().enableFallbackToOriginalPlannerOnce(); - } - } catch (Exception e) { - // ignore this. + dmlCommandType, Optional.empty(), Optional.empty(), plan); + } else if (curCatalog instanceof JdbcExternalCatalog) { + return new UnboundJdbcTableSink<>(nameParts, colNames, hints, partitions, + dmlCommandType, Optional.empty(), Optional.empty(), plan); } + throw new AnalysisException( (isOverwrite ? "insert overwrite" : "insert") + " data to " + curCatalog.getClass().getSimpleName() + " is not supported." + (isAutoDetectPartition - ? " PARTITION(*) is only supported in overwrite partition for OLAP table" : "")); + ? " PARTITION(*) is only supported in overwrite partition for OLAP table" : "")); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 960df63a62f539..49d0f6ae90ac97 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -56,6 +56,7 @@ import org.apache.doris.datasource.iceberg.IcebergExternalTable; import org.apache.doris.datasource.iceberg.source.IcebergScanNode; import org.apache.doris.datasource.jdbc.JdbcExternalTable; +import org.apache.doris.datasource.jdbc.sink.JdbcTableSink; import org.apache.doris.datasource.jdbc.source.JdbcScanNode; import org.apache.doris.datasource.lakesoul.LakeSoulExternalTable; import org.apache.doris.datasource.lakesoul.source.LakeSoulScanNode; @@ -128,6 +129,7 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalIcebergTableSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalIntersect; import org.apache.doris.nereids.trees.plans.physical.PhysicalJdbcScan; +import org.apache.doris.nereids.trees.plans.physical.PhysicalJdbcTableSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalLimit; import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin; import org.apache.doris.nereids.trees.plans.physical.PhysicalOdbcScan; @@ -496,6 +498,24 @@ public PlanFragment visitPhysicalIcebergTableSink(PhysicalIcebergTableSink jdbcTableSink, + PlanTranslatorContext context) { + PlanFragment rootFragment = jdbcTableSink.child().accept(this, context); + rootFragment.setOutputPartition(DataPartition.UNPARTITIONED); + List targetTableColumns = jdbcTableSink.getCols(); + List insertCols = targetTableColumns.stream() + .map(Column::getName) + .collect(Collectors.toList()); + + JdbcTableSink sink = new JdbcTableSink( + ((JdbcExternalTable) jdbcTableSink.getTargetTable()).getJdbcTable(), + insertCols + ); + rootFragment.setSink(sink); + return rootFragment; + } + @Override public PlanFragment visitPhysicalFileSink(PhysicalFileSink fileSink, PlanTranslatorContext context) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/pre/TurnOffPageCacheForInsertIntoSelect.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/pre/TurnOffPageCacheForInsertIntoSelect.java index ab817c2f1d7c56..2479af68fbece9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/pre/TurnOffPageCacheForInsertIntoSelect.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/pre/TurnOffPageCacheForInsertIntoSelect.java @@ -26,6 +26,7 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalFileSink; import org.apache.doris.nereids.trees.plans.logical.LogicalHiveTableSink; import org.apache.doris.nereids.trees.plans.logical.LogicalIcebergTableSink; +import org.apache.doris.nereids.trees.plans.logical.LogicalJdbcTableSink; import org.apache.doris.nereids.trees.plans.logical.LogicalOlapTableSink; import org.apache.doris.qe.SessionVariable; import org.apache.doris.qe.VariableMgr; @@ -67,6 +68,13 @@ public Plan visitLogicalIcebergTableSink( return tableSink; } + @Override + public Plan visitLogicalJdbcTableSink( + LogicalJdbcTableSink tableSink, StatementContext context) { + turnOffPageCache(context); + return tableSink; + } + private void turnOffPageCache(StatementContext context) { SessionVariable sessionVariable = context.getConnectContext().getSessionVariable(); // set temporary session value, and then revert value in the 'finally block' of StmtExecutor#execute diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java index 0b4929e0a87566..e184ce2777d7e5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java @@ -40,6 +40,7 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin; import org.apache.doris.nereids.trees.plans.physical.PhysicalHiveTableSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalIcebergTableSink; +import org.apache.doris.nereids.trees.plans.physical.PhysicalJdbcTableSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalLimit; import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin; import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink; @@ -152,6 +153,14 @@ public Void visitPhysicalIcebergTableSink( return null; } + @Override + public Void visitPhysicalJdbcTableSink( + PhysicalJdbcTableSink jdbcTableSink, PlanContext context) { + // Always use gather properties for jdbcTableSink + addRequestPropertyToChildren(PhysicalProperties.GATHER); + return null; + } + @Override public Void visitPhysicalResultSink(PhysicalResultSink physicalResultSink, PlanContext context) { if (context.getSessionVariable().enableParallelResultSink() diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java index be4d8b390c9f1f..26868665b10806 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java @@ -68,6 +68,7 @@ import org.apache.doris.nereids.rules.implementation.LogicalIcebergTableSinkToPhysicalIcebergTableSink; import org.apache.doris.nereids.rules.implementation.LogicalIntersectToPhysicalIntersect; import org.apache.doris.nereids.rules.implementation.LogicalJdbcScanToPhysicalJdbcScan; +import org.apache.doris.nereids.rules.implementation.LogicalJdbcTableSinkToPhysicalJdbcTableSink; import org.apache.doris.nereids.rules.implementation.LogicalJoinToHashJoin; import org.apache.doris.nereids.rules.implementation.LogicalJoinToNestedLoopJoin; import org.apache.doris.nereids.rules.implementation.LogicalLimitToPhysicalLimit; @@ -190,6 +191,7 @@ public class RuleSet { .add(new LogicalOlapTableSinkToPhysicalOlapTableSink()) .add(new LogicalHiveTableSinkToPhysicalHiveTableSink()) .add(new LogicalIcebergTableSinkToPhysicalIcebergTableSink()) + .add(new LogicalJdbcTableSinkToPhysicalJdbcTableSink()) .add(new LogicalFileSinkToPhysicalFileSink()) .add(new LogicalResultSinkToPhysicalResultSink()) .add(new LogicalDeferMaterializeResultSinkToPhysicalDeferMaterializeResultSink()) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java index f2c572f7779e91..d1a48899873f8d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java @@ -32,6 +32,7 @@ public enum RuleType { BINDING_RESULT_SINK(RuleTypeClass.REWRITE), BINDING_INSERT_HIVE_TABLE(RuleTypeClass.REWRITE), BINDING_INSERT_ICEBERG_TABLE(RuleTypeClass.REWRITE), + BINDING_INSERT_JDBC_TABLE(RuleTypeClass.REWRITE), BINDING_INSERT_TARGET_TABLE(RuleTypeClass.REWRITE), INIT_MATERIALIZATION_HOOK_FOR_FILE_SINK(RuleTypeClass.REWRITE), INIT_MATERIALIZATION_HOOK_FOR_TABLE_SINK(RuleTypeClass.REWRITE), @@ -437,6 +438,7 @@ public enum RuleType { LOGICAL_OLAP_TABLE_SINK_TO_PHYSICAL_OLAP_TABLE_SINK_RULE(RuleTypeClass.IMPLEMENTATION), LOGICAL_HIVE_TABLE_SINK_TO_PHYSICAL_HIVE_TABLE_SINK_RULE(RuleTypeClass.IMPLEMENTATION), LOGICAL_ICEBERG_TABLE_SINK_TO_PHYSICAL_ICEBERG_TABLE_SINK_RULE(RuleTypeClass.IMPLEMENTATION), + LOGICAL_JDBC_TABLE_SINK_TO_PHYSICAL_JDBC_TABLE_SINK_RULE(RuleTypeClass.IMPLEMENTATION), LOGICAL_RESULT_SINK_TO_PHYSICAL_RESULT_SINK_RULE(RuleTypeClass.IMPLEMENTATION), LOGICAL_DEFER_MATERIALIZE_RESULT_SINK_TO_PHYSICAL_DEFER_MATERIALIZE_RESULT_SINK_RULE(RuleTypeClass.IMPLEMENTATION), LOGICAL_FILE_SINK_TO_PHYSICAL_FILE_SINK_RULE(RuleTypeClass.IMPLEMENTATION), diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java index 6d8ad94242b53c..05027b856740c7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java @@ -32,10 +32,13 @@ import org.apache.doris.datasource.hive.HMSExternalTable; import org.apache.doris.datasource.iceberg.IcebergExternalDatabase; import org.apache.doris.datasource.iceberg.IcebergExternalTable; +import org.apache.doris.datasource.jdbc.JdbcExternalDatabase; +import org.apache.doris.datasource.jdbc.JdbcExternalTable; import org.apache.doris.nereids.CascadesContext; import org.apache.doris.nereids.analyzer.Scope; import org.apache.doris.nereids.analyzer.UnboundHiveTableSink; import org.apache.doris.nereids.analyzer.UnboundIcebergTableSink; +import org.apache.doris.nereids.analyzer.UnboundJdbcTableSink; import org.apache.doris.nereids.analyzer.UnboundSlot; import org.apache.doris.nereids.analyzer.UnboundTableSink; import org.apache.doris.nereids.exceptions.AnalysisException; @@ -58,6 +61,7 @@ import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType; import org.apache.doris.nereids.trees.plans.logical.LogicalHiveTableSink; import org.apache.doris.nereids.trees.plans.logical.LogicalIcebergTableSink; +import org.apache.doris.nereids.trees.plans.logical.LogicalJdbcTableSink; import org.apache.doris.nereids.trees.plans.logical.LogicalOlapTableSink; import org.apache.doris.nereids.trees.plans.logical.LogicalOneRowRelation; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; @@ -109,7 +113,8 @@ public List buildRules() { // TODO: bind hive taget table RuleType.BINDING_INSERT_HIVE_TABLE.build(unboundHiveTableSink().thenApply(this::bindHiveTableSink)), RuleType.BINDING_INSERT_ICEBERG_TABLE.build( - unboundIcebergTableSink().thenApply(this::bindIcebergTableSink)) + unboundIcebergTableSink().thenApply(this::bindIcebergTableSink)), + RuleType.BINDING_INSERT_JDBC_TABLE.build(unboundJdbcTableSink().thenApply(this::bindJdbcTableSink)) ); } @@ -524,6 +529,64 @@ private Plan bindIcebergTableSink(MatchingContext> return boundSink.withChildAndUpdateOutput(fullOutputProject); } + private Plan bindJdbcTableSink(MatchingContext> ctx) { + UnboundJdbcTableSink sink = ctx.root; + Pair pair = bind(ctx.cascadesContext, sink); + JdbcExternalDatabase database = pair.first; + JdbcExternalTable table = pair.second; + LogicalPlan child = ((LogicalPlan) sink.child()); + + List bindColumns; + if (sink.getColNames().isEmpty()) { + bindColumns = table.getBaseSchema(true).stream().collect(ImmutableList.toImmutableList()); + } else { + bindColumns = sink.getColNames().stream().map(cn -> { + Column column = table.getColumn(cn); + if (column == null) { + throw new AnalysisException(String.format("column %s is not found in table %s", + cn, table.getName())); + } + return column; + }).collect(ImmutableList.toImmutableList()); + } + LogicalJdbcTableSink boundSink = new LogicalJdbcTableSink<>( + database, + table, + bindColumns, + child.getOutput().stream() + .map(NamedExpression.class::cast) + .collect(ImmutableList.toImmutableList()), + sink.getDMLCommandType(), + Optional.empty(), + Optional.empty(), + child); + // we need to insert all the columns of the target table + if (boundSink.getCols().size() != child.getOutput().size()) { + throw new AnalysisException("insert into cols should be corresponding to the query output"); + } + Map columnToOutput = getJdbcColumnToOutput(bindColumns, child); + // We don't need to insert unmentioned columns, only user specified columns + LogicalProject outputProject = getOutputProjectByCoercion(bindColumns, child, columnToOutput); + return boundSink.withChildAndUpdateOutput(outputProject); + } + + private static Map getJdbcColumnToOutput( + List bindColumns, LogicalPlan child) { + Map columnToOutput = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER); + + for (int i = 0; i < bindColumns.size(); i++) { + Column column = bindColumns.get(i); + NamedExpression outputExpr = child.getOutput().get(i); + Alias output = new Alias( + TypeCoercionUtils.castIfNotSameType(outputExpr, DataType.fromCatalogType(column.getType())), + column.getName() + ); + columnToOutput.put(column.getName(), output); + } + + return columnToOutput; + } + private Pair bind(CascadesContext cascadesContext, UnboundTableSink sink) { List tableQualifier = RelationUtil.getQualifierName(cascadesContext.getConnectContext(), sink.getNameParts()); @@ -567,6 +630,18 @@ private Pair bind(CascadesContext throw new AnalysisException("the target table of insert into is not an iceberg table"); } + private Pair bind(CascadesContext cascadesContext, + UnboundJdbcTableSink sink) { + List tableQualifier = RelationUtil.getQualifierName(cascadesContext.getConnectContext(), + sink.getNameParts()); + Pair, TableIf> pair = RelationUtil.getDbAndTable(tableQualifier, + cascadesContext.getConnectContext().getEnv()); + if (pair.second instanceof JdbcExternalTable) { + return Pair.of(((JdbcExternalDatabase) pair.first), (JdbcExternalTable) pair.second); + } + throw new AnalysisException("the target table of insert into is not an jdbc table"); + } + private List bindPartitionIds(OlapTable table, List partitions, boolean temp) { return partitions.isEmpty() ? ImmutableList.of() diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalJdbcTableSinkToPhysicalJdbcTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalJdbcTableSinkToPhysicalJdbcTableSink.java new file mode 100644 index 00000000000000..960350c6117586 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalJdbcTableSinkToPhysicalJdbcTableSink.java @@ -0,0 +1,48 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.rules.implementation; + +import org.apache.doris.nereids.rules.Rule; +import org.apache.doris.nereids.rules.RuleType; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.logical.LogicalJdbcTableSink; +import org.apache.doris.nereids.trees.plans.physical.PhysicalJdbcTableSink; + +import java.util.Optional; + +/** + * Implementation rule that convert logical JdbcTableSink to physical JdbcTableSink. + */ +public class LogicalJdbcTableSinkToPhysicalJdbcTableSink extends OneImplementationRuleFactory { + @Override + public Rule build() { + return logicalJdbcTableSink().thenApply(ctx -> { + LogicalJdbcTableSink sink = ctx.root; + return new PhysicalJdbcTableSink<>( + sink.getDatabase(), + sink.getTargetTable(), + sink.getCols(), + sink.getOutputExprs(), + Optional.empty(), + sink.getLogicalProperties(), + null, + null, + sink.child()); + }).toRule(RuleType.LOGICAL_JDBC_TABLE_SINK_TO_PHYSICAL_JDBC_TABLE_SINK_RULE); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java index 9f451732bdc886..f3587b379210de 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java @@ -49,9 +49,11 @@ public enum PlanType { LOGICAL_OLAP_TABLE_SINK, LOGICAL_HIVE_TABLE_SINK, LOGICAL_ICEBERG_TABLE_SINK, + LOGICAL_JDBC_TABLE_SINK, LOGICAL_RESULT_SINK, LOGICAL_UNBOUND_OLAP_TABLE_SINK, LOGICAL_UNBOUND_HIVE_TABLE_SINK, + LOGICAL_UNBOUND_JDBC_TABLE_SINK, LOGICAL_UNBOUND_RESULT_SINK, // logical others @@ -103,6 +105,7 @@ public enum PlanType { PHYSICAL_OLAP_TABLE_SINK, PHYSICAL_HIVE_TABLE_SINK, PHYSICAL_ICEBERG_TABLE_SINK, + PHYSICAL_JDBC_TABLE_SINK, PHYSICAL_RESULT_SINK, // physical others diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BaseExternalTableInsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BaseExternalTableInsertExecutor.java index a3aa33f96ab02c..e456d171df5986 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BaseExternalTableInsertExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BaseExternalTableInsertExecutor.java @@ -46,8 +46,8 @@ * Insert executor for base external table */ public abstract class BaseExternalTableInsertExecutor extends AbstractInsertExecutor { + protected static final long INVALID_TXN_ID = -1L; private static final Logger LOG = LogManager.getLogger(BaseExternalTableInsertExecutor.class); - private static final long INVALID_TXN_ID = -1L; protected long txnId = INVALID_TXN_ID; protected TransactionStatus txnStatus = TransactionStatus.ABORTED; protected final TransactionManager transactionManager; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java index 55f16c20e09ede..38d0d8386307cf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java @@ -18,6 +18,7 @@ package org.apache.doris.nereids.trees.plans.commands.insert; import org.apache.doris.analysis.StmtType; +import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.TableIf; @@ -26,12 +27,14 @@ import org.apache.doris.common.util.ProfileManager.ProfileType; import org.apache.doris.datasource.hive.HMSExternalTable; import org.apache.doris.datasource.iceberg.IcebergExternalTable; +import org.apache.doris.datasource.jdbc.JdbcExternalTable; import org.apache.doris.load.loadv2.LoadStatistic; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.nereids.NereidsPlanner; import org.apache.doris.nereids.analyzer.UnboundTableSink; import org.apache.doris.nereids.exceptions.AnalysisException; import org.apache.doris.nereids.glue.LogicalPlanAdapter; +import org.apache.doris.nereids.trees.expressions.Slot; import org.apache.doris.nereids.trees.plans.Explainable; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.PlanType; @@ -41,8 +44,11 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalEmptyRelation; import org.apache.doris.nereids.trees.plans.physical.PhysicalHiveTableSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalIcebergTableSink; +import org.apache.doris.nereids.trees.plans.physical.PhysicalJdbcTableSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink; +import org.apache.doris.nereids.trees.plans.physical.PhysicalOneRowRelation; import org.apache.doris.nereids.trees.plans.physical.PhysicalSink; +import org.apache.doris.nereids.trees.plans.physical.PhysicalUnion; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; import org.apache.doris.planner.DataSink; import org.apache.doris.qe.ConnectContext; @@ -53,6 +59,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.List; import java.util.Objects; import java.util.Optional; @@ -191,9 +198,27 @@ public AbstractInsertExecutor initPlan(ConnectContext ctx, StmtExecutor executor IcebergExternalTable icebergExternalTable = (IcebergExternalTable) targetTableIf; insertExecutor = new IcebergInsertExecutor(ctx, icebergExternalTable, label, planner, Optional.of(insertCtx.orElse((new BaseExternalTableInsertCommandContext()))), emptyInsert); + } else if (physicalSink instanceof PhysicalJdbcTableSink) { + boolean emptyInsert = childIsEmptyRelation(physicalSink); + List cols = ((PhysicalJdbcTableSink) physicalSink).getCols(); + List slots = ((PhysicalJdbcTableSink) physicalSink).getOutput(); + if (physicalSink.children().size() == 1) { + if (physicalSink.child(0) instanceof PhysicalOneRowRelation + || physicalSink.child(0) instanceof PhysicalUnion) { + for (int i = 0; i < cols.size(); i++) { + if (!(cols.get(i).isAllowNull()) && slots.get(i).nullable()) { + throw new AnalysisException("Column `" + cols.get(i).getName() + + "` is not nullable, but the inserted value is nullable."); + } + } + } + } + JdbcExternalTable jdbcExternalTable = (JdbcExternalTable) targetTableIf; + insertExecutor = new JdbcInsertExecutor(ctx, jdbcExternalTable, label, planner, + Optional.of(insertCtx.orElse((new JdbcInsertCommandContext()))), emptyInsert); } else { // TODO: support other table types - throw new AnalysisException("insert into command only support [olap, hive, iceberg] table"); + throw new AnalysisException("insert into command only support [olap, hive, iceberg, jdbc] table"); } if (!insertExecutor.isEmptyInsert()) { insertExecutor.beginTransaction(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java index c5c2197faf0444..49e7858f6faf65 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java @@ -28,9 +28,11 @@ import org.apache.doris.common.Config; import org.apache.doris.common.FormatOptions; import org.apache.doris.datasource.hive.HMSExternalTable; +import org.apache.doris.datasource.jdbc.JdbcExternalTable; import org.apache.doris.nereids.analyzer.UnboundAlias; import org.apache.doris.nereids.analyzer.UnboundHiveTableSink; import org.apache.doris.nereids.analyzer.UnboundIcebergTableSink; +import org.apache.doris.nereids.analyzer.UnboundJdbcTableSink; import org.apache.doris.nereids.analyzer.UnboundOneRowRelation; import org.apache.doris.nereids.analyzer.UnboundTableSink; import org.apache.doris.nereids.exceptions.AnalysisException; @@ -266,6 +268,11 @@ public static Plan normalizePlan(Plan plan, TableIf table, Optional) unboundLogicalSink).isPartialUpdate()) { @@ -408,6 +415,8 @@ public static TableIf getTargetTable(Plan plan, ConnectContext ctx) { unboundTableSink = (UnboundHiveTableSink) plan; } else if (plan instanceof UnboundIcebergTableSink) { unboundTableSink = (UnboundIcebergTableSink) plan; + } else if (plan instanceof UnboundJdbcTableSink) { + unboundTableSink = (UnboundJdbcTableSink) plan; } else { throw new AnalysisException("the root of plan should be" + " [UnboundTableSink, UnboundHiveTableSink, UnboundIcebergTableSink]," diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/JdbcInsertCommandContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/JdbcInsertCommandContext.java new file mode 100644 index 00000000000000..71df7e417e6a8f --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/JdbcInsertCommandContext.java @@ -0,0 +1,24 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.commands.insert; + +/** + * For iceberg External Table + */ +public class JdbcInsertCommandContext extends BaseExternalTableInsertCommandContext { +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/JdbcInsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/JdbcInsertExecutor.java new file mode 100644 index 00000000000000..928b17edf38933 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/JdbcInsertExecutor.java @@ -0,0 +1,113 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.commands.insert; + +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.UserException; +import org.apache.doris.common.profile.SummaryProfile; +import org.apache.doris.common.util.DebugUtil; +import org.apache.doris.datasource.jdbc.JdbcExternalTable; +import org.apache.doris.nereids.NereidsPlanner; +import org.apache.doris.nereids.trees.plans.physical.PhysicalSink; +import org.apache.doris.planner.DataSink; +import org.apache.doris.planner.PlanFragment; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.QueryState; +import org.apache.doris.transaction.TransactionStatus; +import org.apache.doris.transaction.TransactionType; + +import com.google.common.base.Strings; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Optional; + +/** + * Insert executor for jdbc table + */ +public class JdbcInsertExecutor extends BaseExternalTableInsertExecutor { + private static final Logger LOG = LogManager.getLogger(JdbcInsertExecutor.class); + + /** + * constructor + */ + public JdbcInsertExecutor(ConnectContext ctx, JdbcExternalTable table, + String labelName, NereidsPlanner planner, + Optional insertCtx, + boolean emptyInsert) { + super(ctx, table, labelName, planner, insertCtx, emptyInsert); + } + + @Override + public void beginTransaction() { + // do nothing + } + + @Override + protected void onComplete() throws UserException { + if (ctx.getState().getStateType() == QueryState.MysqlStateType.ERR) { + LOG.warn("errors when abort txn. {}", ctx.getQueryIdentifier()); + } else { + summaryProfile.ifPresent(profile -> profile.setTransactionBeginTime(transactionType())); + summaryProfile.ifPresent(SummaryProfile::setTransactionEndTime); + txnStatus = TransactionStatus.COMMITTED; + } + } + + @Override + protected void onFail(Throwable t) { + errMsg = t.getMessage() == null ? "unknown reason" : t.getMessage(); + String queryId = DebugUtil.printId(ctx.queryId()); + // if any throwable being thrown during insert operation, first we should abort this txn + LOG.warn("insert [{}] with query id {} failed", labelName, queryId, t); + StringBuilder sb = new StringBuilder(t.getMessage()); + if (txnId != INVALID_TXN_ID) { + LOG.warn("insert [{}] with query id {} abort txn {} failed", labelName, queryId, txnId); + if (!Strings.isNullOrEmpty(coordinator.getTrackingUrl())) { + sb.append(". url: ").append(coordinator.getTrackingUrl()); + } + } + ctx.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, t.getMessage()); + } + + @Override + protected void finalizeSink(PlanFragment fragment, DataSink sink, PhysicalSink physicalSink) { + // do nothing + } + + @Override + protected void setCollectCommitInfoFunc() { + // do nothing + } + + @Override + protected void doBeforeCommit() throws UserException { + // do nothing + } + + @Override + protected TransactionType transactionType() { + return TransactionType.JDBC; + } + + @Override + protected void beforeExec() { + String queryId = DebugUtil.printId(ctx.queryId()); + LOG.info("start insert [{}] with query id {} and txn id {}", labelName, queryId, txnId); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalJdbcTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalJdbcTableSink.java new file mode 100644 index 00000000000000..b4027383916599 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalJdbcTableSink.java @@ -0,0 +1,151 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.logical; + +import org.apache.doris.catalog.Column; +import org.apache.doris.datasource.jdbc.JdbcExternalDatabase; +import org.apache.doris.datasource.jdbc.JdbcExternalTable; +import org.apache.doris.nereids.memo.GroupExpression; +import org.apache.doris.nereids.properties.LogicalProperties; +import org.apache.doris.nereids.trees.expressions.NamedExpression; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.PlanType; +import org.apache.doris.nereids.trees.plans.PropagateFuncDeps; +import org.apache.doris.nereids.trees.plans.algebra.Sink; +import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType; +import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; +import org.apache.doris.nereids.util.Utils; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; + +import java.util.List; +import java.util.Objects; +import java.util.Optional; + +/** + * logical jdbc table sink for insert command + */ +public class LogicalJdbcTableSink extends LogicalTableSink + implements Sink, PropagateFuncDeps { + // bound data sink + private final JdbcExternalDatabase database; + private final JdbcExternalTable targetTable; + private final DMLCommandType dmlCommandType; + + /** + * constructor + */ + public LogicalJdbcTableSink(JdbcExternalDatabase database, + JdbcExternalTable targetTable, + List cols, + List outputExprs, + DMLCommandType dmlCommandType, + Optional groupExpression, + Optional logicalProperties, + CHILD_TYPE child) { + super(PlanType.LOGICAL_JDBC_TABLE_SINK, outputExprs, groupExpression, logicalProperties, cols, child); + this.database = Objects.requireNonNull(database, "database != null in LogicalJdbcTableSink"); + this.targetTable = Objects.requireNonNull(targetTable, "targetTable != null in LogicalJdbcTableSink"); + this.dmlCommandType = dmlCommandType; + } + + public Plan withChildAndUpdateOutput(Plan child) { + List output = child.getOutput().stream() + .map(NamedExpression.class::cast) + .collect(ImmutableList.toImmutableList()); + return new LogicalJdbcTableSink<>(database, targetTable, cols, output, + dmlCommandType, Optional.empty(), Optional.empty(), child); + } + + @Override + public Plan withChildren(List children) { + Preconditions.checkArgument(children.size() == 1, "LogicalJdbcTableSink only accepts one child"); + return new LogicalJdbcTableSink<>(database, targetTable, cols, outputExprs, + dmlCommandType, Optional.empty(), Optional.empty(), children.get(0)); + } + + @Override + public LogicalSink withOutputExprs(List outputExprs) { + return new LogicalJdbcTableSink<>(database, targetTable, cols, outputExprs, + dmlCommandType, Optional.empty(), Optional.empty(), child()); + } + + public JdbcExternalDatabase getDatabase() { + return database; + } + + public JdbcExternalTable getTargetTable() { + return targetTable; + } + + public DMLCommandType getDmlCommandType() { + return dmlCommandType; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof LogicalJdbcTableSink)) { + return false; + } + if (!super.equals(o)) { + return false; + } + LogicalJdbcTableSink that = (LogicalJdbcTableSink) o; + return dmlCommandType == that.dmlCommandType + && Objects.equals(database, that.database) + && Objects.equals(targetTable, that.targetTable) && Objects.equals(cols, that.cols); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), database, targetTable, dmlCommandType); + } + + @Override + public String toString() { + return Utils.toSqlString("LogicalJdbcTableSink[" + id.asInt() + "]", + "outputExprs", outputExprs, + "database", database.getFullName(), + "targetTable", targetTable.getName(), + "cols", cols, + "dmlCommandType", dmlCommandType + ); + } + + @Override + public R accept(PlanVisitor visitor, C context) { + return visitor.visitLogicalJdbcTableSink(this, context); + } + + @Override + public Plan withGroupExpression(Optional groupExpression) { + return new LogicalJdbcTableSink<>(database, targetTable, cols, outputExprs, + dmlCommandType, groupExpression, Optional.of(getLogicalProperties()), child()); + } + + @Override + public Plan withGroupExprLogicalPropChildren(Optional groupExpression, + Optional logicalProperties, List children) { + return new LogicalJdbcTableSink<>(database, targetTable, cols, outputExprs, + dmlCommandType, groupExpression, logicalProperties, children.get(0)); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalBaseExternalTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalBaseExternalTableSink.java index 82483c63a40412..7c99886f06dffe 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalBaseExternalTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalBaseExternalTableSink.java @@ -71,6 +71,10 @@ public ExternalTable getTargetTable() { return targetTable; } + public List getCols() { + return cols; + } + @Override public List getExpressions() { return ImmutableList.of(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalJdbcTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalJdbcTableSink.java new file mode 100644 index 00000000000000..2b0f12c1dea62a --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalJdbcTableSink.java @@ -0,0 +1,109 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.physical; + +import org.apache.doris.catalog.Column; +import org.apache.doris.datasource.jdbc.JdbcExternalDatabase; +import org.apache.doris.datasource.jdbc.JdbcExternalTable; +import org.apache.doris.nereids.memo.GroupExpression; +import org.apache.doris.nereids.properties.LogicalProperties; +import org.apache.doris.nereids.properties.PhysicalProperties; +import org.apache.doris.nereids.trees.expressions.NamedExpression; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.PlanType; +import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; +import org.apache.doris.statistics.Statistics; + +import java.util.List; +import java.util.Optional; + +/** physical jdbc sink */ +public class PhysicalJdbcTableSink extends PhysicalBaseExternalTableSink { + + /** + * constructor + */ + public PhysicalJdbcTableSink(JdbcExternalDatabase database, + JdbcExternalTable targetTable, + List cols, + List outputExprs, + Optional groupExpression, + LogicalProperties logicalProperties, + CHILD_TYPE child) { + this(database, targetTable, cols, outputExprs, groupExpression, logicalProperties, + PhysicalProperties.GATHER, null, child); + } + + /** + * constructor + */ + public PhysicalJdbcTableSink(JdbcExternalDatabase database, + JdbcExternalTable targetTable, + List cols, + List outputExprs, + Optional groupExpression, + LogicalProperties logicalProperties, + PhysicalProperties physicalProperties, + Statistics statistics, + CHILD_TYPE child) { + super(PlanType.PHYSICAL_JDBC_TABLE_SINK, database, targetTable, cols, outputExprs, groupExpression, + logicalProperties, physicalProperties, statistics, child); + } + + @Override + public Plan withChildren(List children) { + return new PhysicalJdbcTableSink<>( + (JdbcExternalDatabase) database, (JdbcExternalTable) targetTable, + cols, outputExprs, groupExpression, + getLogicalProperties(), physicalProperties, statistics, children.get(0)); + } + + @Override + public R accept(PlanVisitor visitor, C context) { + return visitor.visitPhysicalJdbcTableSink(this, context); + } + + @Override + public Plan withGroupExpression(Optional groupExpression) { + return new PhysicalJdbcTableSink<>( + (JdbcExternalDatabase) database, (JdbcExternalTable) targetTable, cols, outputExprs, + groupExpression, getLogicalProperties(), child()); + } + + @Override + public Plan withGroupExprLogicalPropChildren(Optional groupExpression, + Optional logicalProperties, List children) { + return new PhysicalJdbcTableSink<>( + (JdbcExternalDatabase) database, (JdbcExternalTable) targetTable, cols, outputExprs, + groupExpression, logicalProperties.get(), children.get(0)); + } + + @Override + public PhysicalPlan withPhysicalPropertiesAndStats(PhysicalProperties physicalProperties, Statistics statistics) { + return new PhysicalJdbcTableSink<>( + (JdbcExternalDatabase) database, (JdbcExternalTable) targetTable, cols, outputExprs, + groupExpression, getLogicalProperties(), physicalProperties, statistics, child()); + } + + @Override + public PhysicalProperties getRequirePhysicalProperties() { + // Since JDBC tables do not have partitioning, return a default physical property. + // GATHER implies that all data is gathered to a single location, which is a common requirement for JDBC sinks. + return PhysicalProperties.GATHER; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/SinkVisitor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/SinkVisitor.java index e0b8a1dddc1706..289687476b2cf3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/SinkVisitor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/SinkVisitor.java @@ -19,6 +19,7 @@ import org.apache.doris.nereids.analyzer.UnboundHiveTableSink; import org.apache.doris.nereids.analyzer.UnboundIcebergTableSink; +import org.apache.doris.nereids.analyzer.UnboundJdbcTableSink; import org.apache.doris.nereids.analyzer.UnboundResultSink; import org.apache.doris.nereids.analyzer.UnboundTableSink; import org.apache.doris.nereids.trees.plans.Plan; @@ -26,6 +27,7 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalFileSink; import org.apache.doris.nereids.trees.plans.logical.LogicalHiveTableSink; import org.apache.doris.nereids.trees.plans.logical.LogicalIcebergTableSink; +import org.apache.doris.nereids.trees.plans.logical.LogicalJdbcTableSink; import org.apache.doris.nereids.trees.plans.logical.LogicalOlapTableSink; import org.apache.doris.nereids.trees.plans.logical.LogicalResultSink; import org.apache.doris.nereids.trees.plans.logical.LogicalSink; @@ -34,6 +36,7 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalFileSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalHiveTableSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalIcebergTableSink; +import org.apache.doris.nereids.trees.plans.physical.PhysicalJdbcTableSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalResultSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalSink; @@ -68,6 +71,10 @@ default R visitUnboundIcebergTableSink(UnboundIcebergTableSink u return visitLogicalSink(unboundTableSink, context); } + default R visitUnboundJdbcTableSink(UnboundJdbcTableSink unboundTableSink, C context) { + return visitLogicalSink(unboundTableSink, context); + } + default R visitUnboundResultSink(UnboundResultSink unboundResultSink, C context) { return visitLogicalSink(unboundResultSink, context); } @@ -96,6 +103,10 @@ default R visitLogicalIcebergTableSink(LogicalIcebergTableSink i return visitLogicalTableSink(icebergTableSink, context); } + default R visitLogicalJdbcTableSink(LogicalJdbcTableSink jdbcTableSink, C context) { + return visitLogicalTableSink(jdbcTableSink, context); + } + default R visitLogicalResultSink(LogicalResultSink logicalResultSink, C context) { return visitLogicalSink(logicalResultSink, context); } @@ -129,6 +140,10 @@ default R visitPhysicalIcebergTableSink(PhysicalIcebergTableSink return visitPhysicalTableSink(icebergTableSink, context); } + default R visitPhysicalJdbcTableSink(PhysicalJdbcTableSink jdbcTableSink, C context) { + return visitPhysicalTableSink(jdbcTableSink, context); + } + default R visitPhysicalResultSink(PhysicalResultSink physicalResultSink, C context) { return visitPhysicalSink(physicalResultSink, context); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/JdbcTransactionManager.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/JdbcTransactionManager.java new file mode 100644 index 00000000000000..a0a1cc28803d4e --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/JdbcTransactionManager.java @@ -0,0 +1,42 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.transaction; + +import org.apache.doris.common.UserException; + +public class JdbcTransactionManager implements TransactionManager { + @Override + public long begin() { + return 0; + } + + @Override + public void commit(long id) throws UserException { + + } + + @Override + public void rollback(long id) { + + } + + @Override + public Transaction getTransaction(long id) { + return null; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionType.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionType.java index 2372c199738116..c83f61888901c4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionType.java @@ -20,5 +20,6 @@ public enum TransactionType { UNKNOWN, HMS, - ICEBERG + ICEBERG, + JDBC }