Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature](iceberg) iceberg write support insert overwrite and optimize hive write transaction statistics and #37191

Merged
merged 4 commits into from
Jul 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
import org.apache.doris.common.info.SimpleTableInfo;
import org.apache.doris.common.security.authentication.HadoopAuthenticator;
import org.apache.doris.datasource.ExternalDatabase;
import org.apache.doris.datasource.jdbc.client.JdbcClient;
Expand Down Expand Up @@ -297,25 +298,23 @@ public List<String> listDatabaseNames() {
}

public void updateTableStatistics(
String dbName,
String tableName,
SimpleTableInfo tableInfo,
Function<HivePartitionStatistics, HivePartitionStatistics> update) {
client.updateTableStatistics(dbName, tableName, update);
client.updateTableStatistics(tableInfo.getDbName(), tableInfo.getTbName(), update);
}

void updatePartitionStatistics(
String dbName,
String tableName,
SimpleTableInfo tableInfo,
String partitionName,
Function<HivePartitionStatistics, HivePartitionStatistics> update) {
client.updatePartitionStatistics(dbName, tableName, partitionName, update);
client.updatePartitionStatistics(tableInfo.getDbName(), tableInfo.getTbName(), partitionName, update);
}

public void addPartitions(String dbName, String tableName, List<HivePartitionWithStatistics> partitions) {
client.addPartitions(dbName, tableName, partitions);
public void addPartitions(SimpleTableInfo tableInfo, List<HivePartitionWithStatistics> partitions) {
client.addPartitions(tableInfo.getDbName(), tableInfo.getTbName(), partitions);
}

public void dropPartition(String dbName, String tableName, List<String> partitionValues, boolean deleteData) {
client.dropPartition(dbName, tableName, partitionValues, deleteData);
public void dropPartition(SimpleTableInfo tableInfo, List<String> partitionValues, boolean deleteData) {
client.dropPartition(tableInfo.getDbName(), tableInfo.getTbName(), partitionValues, deleteData);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.doris.datasource.hive;

import org.apache.doris.catalog.Column;
import org.apache.doris.common.info.SimpleTableInfo;

import com.google.common.base.Preconditions;
import lombok.Data;
Expand All @@ -31,8 +32,7 @@ public class HivePartition {
public static final String LAST_MODIFY_TIME_KEY = "transient_lastDdlTime";
public static final String FILE_NUM_KEY = "numFiles";

private String dbName;
private String tblName;
private SimpleTableInfo tableInfo;
private String inputFormat;
private String path;
private List<String> partitionValues;
Expand All @@ -43,10 +43,9 @@ public class HivePartition {
private List<FieldSchema> columns;

// If you want to read the data under a partition, you can use this constructor
public HivePartition(String dbName, String tblName, boolean isDummyPartition,
public HivePartition(SimpleTableInfo tableInfo, boolean isDummyPartition,
String inputFormat, String path, List<String> partitionValues, Map<String, String> parameters) {
this.dbName = dbName;
this.tblName = tblName;
this.tableInfo = tableInfo;
this.isDummyPartition = isDummyPartition;
// eg: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat
this.inputFormat = inputFormat;
Expand All @@ -57,17 +56,31 @@ public HivePartition(String dbName, String tblName, boolean isDummyPartition,
this.parameters = parameters;
}

public HivePartition(String database, String tableName, boolean isDummyPartition,
String inputFormat, String path, List<String> partitionValues, Map<String, String> parameters) {
this(new SimpleTableInfo(database, tableName), isDummyPartition, inputFormat, path, partitionValues,
parameters);
}

// If you want to update hms with partition, then you can use this constructor,
// as updating hms requires some additional information, such as outputFormat and so on
public HivePartition(String dbName, String tblName, boolean isDummyPartition,
String inputFormat, String path, List<String> partitionValues, Map<String, String> parameters,
String outputFormat, String serde, List<FieldSchema> columns) {
this(dbName, tblName, isDummyPartition, inputFormat, path, partitionValues, parameters);
public HivePartition(SimpleTableInfo tableInfo, boolean isDummyPartition,
String inputFormat, String path, List<String> partitionValues, Map<String, String> parameters,
String outputFormat, String serde, List<FieldSchema> columns) {
this(tableInfo, isDummyPartition, inputFormat, path, partitionValues, parameters);
this.outputFormat = outputFormat;
this.serde = serde;
this.columns = columns;
}

public String getDbName() {
return tableInfo.getDbName();
}

public String getTblName() {
return tableInfo.getTbName();
}

// return partition name like: nation=cn/city=beijing
public String getPartitionName(List<Column> partColumns) {
Preconditions.checkState(partColumns.size() == partitionValues.size());
Expand All @@ -94,6 +107,7 @@ public long getLastModifiedTime() {

/**
* If there are no files, it proves that there is no data under the partition, we return 0
*
* @return
*/
public long getLastModifiedTimeIgnoreInit() {
Expand All @@ -112,12 +126,17 @@ public long getFileNum() {

@Override
public String toString() {
return "HivePartition{"
+ "dbName='" + dbName + '\''
+ ", tblName='" + tblName + '\''
+ ", isDummyPartition='" + isDummyPartition + '\''
+ ", inputFormat='" + inputFormat + '\''
+ ", path='" + path + '\''
+ ", partitionValues=" + partitionValues + '}';
final StringBuilder sb = new StringBuilder("HivePartition{");
sb.append("tableInfo=").append(tableInfo);
sb.append(", inputFormat='").append(inputFormat).append('\'');
sb.append(", path='").append(path).append('\'');
sb.append(", partitionValues=").append(partitionValues);
sb.append(", isDummyPartition=").append(isDummyPartition);
sb.append(", parameters=").append(parameters);
sb.append(", outputFormat='").append(outputFormat).append('\'');
sb.append(", serde='").append(serde).append('\'');
sb.append(", columns=").append(columns);
sb.append('}');
return sb.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,28 @@

package org.apache.doris.datasource.hive;

import org.apache.doris.datasource.statistics.CommonStatistics;
import org.apache.doris.datasource.statistics.CommonStatistics.ReduceOperator;

import com.google.common.collect.ImmutableMap;

import java.util.Map;

public class HivePartitionStatistics {
public static final HivePartitionStatistics EMPTY =
new HivePartitionStatistics(HiveCommonStatistics.EMPTY, ImmutableMap.of());
new HivePartitionStatistics(CommonStatistics.EMPTY, ImmutableMap.of());

private final HiveCommonStatistics commonStatistics;
private final CommonStatistics commonStatistics;
private final Map<String, HiveColumnStatistics> columnStatisticsMap;

public HivePartitionStatistics(
HiveCommonStatistics commonStatistics,
CommonStatistics commonStatistics,
Map<String, HiveColumnStatistics> columnStatisticsMap) {
this.commonStatistics = commonStatistics;
this.columnStatisticsMap = columnStatisticsMap;
}

public HiveCommonStatistics getCommonStatistics() {
public CommonStatistics getCommonStatistics() {
return commonStatistics;
}

Expand All @@ -48,7 +51,7 @@ public Map<String, HiveColumnStatistics> getColumnStatisticsMap() {

public static HivePartitionStatistics fromCommonStatistics(long rowCount, long fileCount, long totalFileBytes) {
return new HivePartitionStatistics(
new HiveCommonStatistics(rowCount, fileCount, totalFileBytes),
new CommonStatistics(rowCount, fileCount, totalFileBytes),
ImmutableMap.of()
);
}
Expand All @@ -62,56 +65,32 @@ public static HivePartitionStatistics merge(HivePartitionStatistics current, Hiv
}

return new HivePartitionStatistics(
reduce(current.getCommonStatistics(), update.getCommonStatistics(), ReduceOperator.ADD),
// TODO merge columnStatisticsMap
current.getColumnStatisticsMap());
CommonStatistics
.reduce(current.getCommonStatistics(), update.getCommonStatistics(), ReduceOperator.ADD),
// TODO merge columnStatisticsMap
current.getColumnStatisticsMap());
}

public static HivePartitionStatistics reduce(
HivePartitionStatistics first,
HivePartitionStatistics second,
ReduceOperator operator) {
HiveCommonStatistics left = first.getCommonStatistics();
HiveCommonStatistics right = second.getCommonStatistics();
CommonStatistics left = first.getCommonStatistics();
CommonStatistics right = second.getCommonStatistics();
return HivePartitionStatistics.fromCommonStatistics(
reduce(left.getRowCount(), right.getRowCount(), operator),
reduce(left.getFileCount(), right.getFileCount(), operator),
reduce(left.getTotalFileBytes(), right.getTotalFileBytes(), operator));
CommonStatistics.reduce(left.getRowCount(), right.getRowCount(), operator),
CommonStatistics.reduce(left.getFileCount(), right.getFileCount(), operator),
CommonStatistics.reduce(left.getTotalFileBytes(), right.getTotalFileBytes(), operator));
}

public static HiveCommonStatistics reduce(
HiveCommonStatistics current,
HiveCommonStatistics update,
public static CommonStatistics reduce(
CommonStatistics current,
CommonStatistics update,
ReduceOperator operator) {
return new HiveCommonStatistics(
reduce(current.getRowCount(), update.getRowCount(), operator),
reduce(current.getFileCount(), update.getFileCount(), operator),
reduce(current.getTotalFileBytes(), update.getTotalFileBytes(), operator));
}

public static long reduce(long current, long update, ReduceOperator operator) {
if (current >= 0 && update >= 0) {
switch (operator) {
case ADD:
return current + update;
case SUBTRACT:
return current - update;
case MAX:
return Math.max(current, update);
case MIN:
return Math.min(current, update);
default:
throw new IllegalArgumentException("Unexpected operator: " + operator);
}
}

return 0;
return new CommonStatistics(
CommonStatistics.reduce(current.getRowCount(), update.getRowCount(), operator),
CommonStatistics.reduce(current.getFileCount(), update.getFileCount(), operator),
CommonStatistics.reduce(current.getTotalFileBytes(), update.getTotalFileBytes(), operator));
}

public enum ReduceOperator {
ADD,
SUBTRACT,
MIN,
MAX,
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.statistics.CommonStatistics;
import org.apache.doris.fs.remote.BrokerFileSystem;
import org.apache.doris.fs.remote.RemoteFileSystem;
import org.apache.doris.nereids.exceptions.AnalysisException;
Expand Down Expand Up @@ -75,14 +76,14 @@ private HiveUtil() {
/**
* get input format class from inputFormatName.
*
* @param jobConf jobConf used when getInputFormatClass
* @param jobConf jobConf used when getInputFormatClass
* @param inputFormatName inputFormat class name
* @param symlinkTarget use target inputFormat class when inputFormat is SymlinkTextInputFormat
* @param symlinkTarget use target inputFormat class when inputFormat is SymlinkTextInputFormat
* @return a class of inputFormat.
* @throws UserException when class not found.
*/
public static InputFormat<?, ?> getInputFormat(JobConf jobConf,
String inputFormatName, boolean symlinkTarget) throws UserException {
String inputFormatName, boolean symlinkTarget) throws UserException {
try {
Class<? extends InputFormat<?, ?>> inputFormatClass = getInputFormatClass(jobConf, inputFormatName);
if (symlinkTarget && (inputFormatClass == SymlinkTextInputFormat.class)) {
Expand Down Expand Up @@ -167,12 +168,12 @@ public static Map<String, Partition> convertToNamePartitionMap(

Map<String, List<String>> partitionNameToPartitionValues =
partitionNames
.stream()
.collect(Collectors.toMap(partitionName -> partitionName, HiveUtil::toPartitionValues));
.stream()
.collect(Collectors.toMap(partitionName -> partitionName, HiveUtil::toPartitionValues));

Map<List<String>, Partition> partitionValuesToPartition =
partitions.stream()
.collect(Collectors.toMap(Partition::getValues, partition -> partition));
.collect(Collectors.toMap(Partition::getValues, partition -> partition));

ImmutableMap.Builder<String, Partition> resultBuilder = ImmutableMap.builder();
for (Map.Entry<String, List<String>> entry : partitionNameToPartitionValues.entrySet()) {
Expand Down Expand Up @@ -312,7 +313,7 @@ public static Database toHiveDatabase(HiveDatabaseMetadata hiveDb) {

public static Map<String, String> updateStatisticsParameters(
Map<String, String> parameters,
HiveCommonStatistics statistics) {
CommonStatistics statistics) {
HashMap<String, String> result = new HashMap<>(parameters);

result.put(StatsSetupConst.NUM_FILES, String.valueOf(statistics.getFileCount()));
Expand Down Expand Up @@ -345,8 +346,8 @@ public static Partition toMetastoreApiPartition(HivePartitionWithStatistics part

public static Partition toMetastoreApiPartition(HivePartition hivePartition) {
Partition result = new Partition();
result.setDbName(hivePartition.getDbName());
result.setTableName(hivePartition.getTblName());
result.setDbName(hivePartition.getTableInfo().getDbName());
result.setTableName(hivePartition.getTableInfo().getTbName());
result.setValues(hivePartition.getPartitionValues());
result.setSd(makeStorageDescriptorFromHivePartition(hivePartition));
result.setParameters(hivePartition.getParameters());
Expand All @@ -355,7 +356,7 @@ public static Partition toMetastoreApiPartition(HivePartition hivePartition) {

public static StorageDescriptor makeStorageDescriptorFromHivePartition(HivePartition partition) {
SerDeInfo serdeInfo = new SerDeInfo();
serdeInfo.setName(partition.getTblName());
serdeInfo.setName(partition.getTableInfo().getTbName());
serdeInfo.setSerializationLib(partition.getSerde());

StorageDescriptor sd = new StorageDescriptor();
Expand Down
Loading
Loading