From 7c77ec13d884669ac7ad059efa8da292e203f550 Mon Sep 17 00:00:00 2001
From: yubingpeng <yubingpenguestc@163.com>
Date: Thu, 14 Dec 2017 10:42:18 +0800
Subject: [PATCH 1/5] report the real disk available capacity to fe

---
 be/src/agent/task_worker_pool.cpp             |  5 +--
 be/src/olap/olap_engine.cpp                   |  4 +--
 be/src/olap/olap_rootpath.cpp                 | 22 ++++++------
 be/src/olap/olap_rootpath.h                   | 14 +++++---
 fe/src/com/baidu/palo/catalog/DiskInfo.java   | 36 ++++++++++++++-----
 fe/src/com/baidu/palo/common/FeConstants.java |  2 +-
 .../com/baidu/palo/common/FeMetaVersion.java  |  3 ++
 .../palo/common/proc/BackendProcNode.java     | 30 ++++++++++------
 fe/src/com/baidu/palo/system/Backend.java     | 22 +++++++++---
 gensrc/thrift/MasterService.thrift            |  5 +--
 10 files changed, 99 insertions(+), 44 deletions(-)

diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp
index c5c6afa9e037ce..725ca95d5bfdff 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -1539,8 +1539,9 @@ void* TaskWorkerPool::_report_disk_state_worker_thread_callback(void* arg_this)
         for (auto root_path_state : root_paths_stat) {
             TDisk disk;
             disk.__set_root_path(root_path_state.root_path);
-            disk.__set_total_capacity(static_cast<double>(root_path_state.capacity));
-            disk.__set_available_capacity(static_cast<double>(root_path_state.available));
+            disk.__set_disk_total_capacity(static_cast<double>(root_path_state.disk_total_capacity));
+            disk.__set_data_used_capacity(static_cast<double>(root_path_state.data_used_capacity));
+            disk.__set_disk_available_capacity(static_cast<double>(root_path_state.disk_available_capacity));
             disk.__set_used(root_path_state.is_used);
             disks[root_path_state.root_path] = disk;
         }
diff --git a/be/src/olap/olap_engine.cpp b/be/src/olap/olap_engine.cpp
index 8f244d75d2e8ea..dbc06be1aa6a38 100644
--- a/be/src/olap/olap_engine.cpp
+++ b/be/src/olap/olap_engine.cpp
@@ -1146,8 +1146,8 @@ OLAPStatus OLAPEngine::start_trash_sweep(double* usage) {
             continue;
         }
 
-        double curr_usage = (stat.capacity - stat.available)
-                / (double) stat.capacity;
+        double curr_usage = (stat.disk_total_capacity - stat.disk_available_capacity)
+                / (double) stat.disk_total_capacity;
         *usage = *usage > curr_usage ? *usage : curr_usage;
 
         OLAPStatus curr_res = OLAP_SUCCESS;
diff --git a/be/src/olap/olap_rootpath.cpp b/be/src/olap/olap_rootpath.cpp
index a102b3a56d6a7c..e38069d947408b 100644
--- a/be/src/olap/olap_rootpath.cpp
+++ b/be/src/olap/olap_rootpath.cpp
@@ -257,10 +257,11 @@ OLAPStatus OLAPRootPath::get_all_disk_stat(vector<OLAPRootPathStat>* disks_stat)
 
     for (auto& stat : *disks_stat) {
         if (stat.is_used) {
-            _get_disk_capacity(stat.root_path, &stat.capacity, &stat.available);
+            _get_disk_capacity(stat.root_path, &stat.disk_total_capacity, &stat.disk_available_capacity);
         } else {
-            stat.capacity = 0;
-            stat.available = 0;
+            stat.disk_total_capacity = 0;
+            stat.disk_available_capacity = 0;
+            stat.data_used_capacity = 0;
         }
     }
 
@@ -276,7 +277,7 @@ OLAPStatus OLAPRootPath::get_all_root_path_stat(vector<OLAPRootPathStat>* root_p
         OLAPRootPathStat stat;
         stat.root_path = it->first;
         stat.is_used = it->second.is_used;
-        stat.capacity = it->second.capacity;
+        stat.disk_total_capacity = it->second.capacity;
 
         root_paths_stat->push_back(stat);
     }
@@ -284,10 +285,12 @@ OLAPStatus OLAPRootPath::get_all_root_path_stat(vector<OLAPRootPathStat>* root_p
 
     for (auto& stat : *root_paths_stat) {
         if (stat.is_used) {
-            _get_root_path_capacity(stat.root_path, stat.capacity, &stat.available);
+            _get_disk_capacity(stat.root_path, &stat.disk_total_capacity, &stat.disk_available_capacity);
+            _get_root_path_capacity(stat.root_path, &stat.data_used_capacity);
         } else {
-            stat.capacity = 0;
-            stat.available = 0;
+            stat.disk_total_capacity = 0;
+            stat.data_used_capacity = 0;
+            stat.disk_available_capacity = 0;
         }
     }
 
@@ -693,8 +696,7 @@ OLAPStatus OLAPRootPath::_parse_root_paths_from_string(
 
 OLAPStatus OLAPRootPath::_get_root_path_capacity(
         const string& root_path,
-        int64_t capacity,
-        int64_t* available) {
+        int64_t* data_used) {
     OLAPStatus res = OLAP_SUCCESS;
     int64_t used = 0;
 
@@ -706,7 +708,7 @@ OLAPStatus OLAPRootPath::_get_root_path_capacity(
                 used += file_size(*it);
             }
         }
-        *available = capacity - used;
+        *data_used = used;
     } catch (boost::filesystem::filesystem_error& e) {
         OLAP_LOG_WARNING("get space info failed. [path: %s, erro:%s]", root_path.c_str(), e.what());
         return OLAP_ERR_STL_ERROR;
diff --git a/be/src/olap/olap_rootpath.h b/be/src/olap/olap_rootpath.h
index a94016deab4d18..6ecc1cfd16e3ed 100644
--- a/be/src/olap/olap_rootpath.h
+++ b/be/src/olap/olap_rootpath.h
@@ -31,11 +31,16 @@
 namespace palo {
 
 struct OLAPRootPathStat {
-    OLAPRootPathStat(): capacity(0), available(0), is_used(false) {}
+    OLAPRootPathStat(): 
+          disk_total_capacity(0),
+          data_used_capacity(0),
+          disk_available_capacity(0),
+          is_used(false) {}
 
     std::string root_path;
-    int64_t capacity;
-    int64_t available;
+    int64_t disk_total_capacity;
+    int64_t data_used_capacity;
+    int64_t disk_available_capacity;
     bool is_used;
 };
 
@@ -166,8 +171,7 @@ class OLAPRootPath {
 
     OLAPStatus _get_root_path_capacity(
             const std::string& root_path,
-            int64_t capacity,
-            int64_t* available);
+            int64_t* data_used);
 
     OLAPStatus _get_disk_capacity(
             const std::string& root_path,
diff --git a/fe/src/com/baidu/palo/catalog/DiskInfo.java b/fe/src/com/baidu/palo/catalog/DiskInfo.java
index 6b7e01fe0b7faf..951ae15f785575 100644
--- a/fe/src/com/baidu/palo/catalog/DiskInfo.java
+++ b/fe/src/com/baidu/palo/catalog/DiskInfo.java
@@ -15,6 +15,7 @@
 
 package com.baidu.palo.catalog;
 
+import com.baidu.palo.common.FeMetaVersion;
 import com.baidu.palo.common.io.Text;
 import com.baidu.palo.common.io.Writable;
 
@@ -32,7 +33,8 @@ public enum DiskState {
 
     private String rootPath;
     private long totalCapacityB;
-    private long availableCapacityB;
+    private long dataUsedCapacityB;
+    private long diskAvailableCapacityB;
     private DiskState state;
 
     private DiskInfo() {
@@ -42,7 +44,8 @@ private DiskInfo() {
     public DiskInfo(String rootPath) {
         this.rootPath = rootPath;
         this.totalCapacityB = DEFAULT_CAPACITY_B;
-        this.availableCapacityB = DEFAULT_CAPACITY_B;
+        this.dataUsedCapacityB = 0;
+        this.diskAvailableCapacityB = DEFAULT_CAPACITY_B;
         this.state = DiskState.ONLINE;
     }
 
@@ -58,14 +61,23 @@ public void setTotalCapacityB(long totalCapacityB) {
         this.totalCapacityB = totalCapacityB;
     }
 
+    public long getDataUsedCapacityB() {
+        return dataUsedCapacityB;
+    }
+
+    public void setDataUsedCapacityB(long dataUsedCapacityB) {
+        this.dataUsedCapacityB = dataUsedCapacityB;
+    }
+
     public long getAvailableCapacityB() {
-        return availableCapacityB;
+        return diskAvailableCapacityB;
     }
 
     public void setAvailableCapacityB(long availableCapacityB) {
-        this.availableCapacityB = availableCapacityB;
+        this.diskAvailableCapacityB = availableCapacityB;
     }
 
+
     public DiskState getState() {
         return state;
     }
@@ -76,15 +88,16 @@ public void setState(DiskState state) {
 
     @Override
     public String toString() {
-        return "DiskInfo [rootPath=" + rootPath + ", totalCapacityB=" + totalCapacityB + ", availableCapacityB="
-                + availableCapacityB + ", state=" + state + "]";
+        return "DiskInfo [rootPath=" + rootPath + ", totalCapacityB=" + totalCapacityB + ", dataUsedCapacityB="
+                + dataUsedCapacityB + ", diskAvailableCapacityB=" + diskAvailableCapacityB + ", state=" + state + "]";
     }
 
     @Override
     public void write(DataOutput out) throws IOException {
         Text.writeString(out, rootPath);
         out.writeLong(totalCapacityB);
-        out.writeLong(availableCapacityB);
+        out.writeLong(dataUsedCapacityB);
+        out.writeLong(diskAvailableCapacityB);
         Text.writeString(out, state.name());
     }
 
@@ -92,7 +105,14 @@ public void write(DataOutput out) throws IOException {
     public void readFields(DataInput in) throws IOException {
         this.rootPath = Text.readString(in);
         this.totalCapacityB = in.readLong();
-        this.availableCapacityB = in.readLong();
+        if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_36) {
+            this.dataUsedCapacityB = in.readLong();
+            this.diskAvailableCapacityB = in.readLong();
+        } else {
+            long availableCapacityB = in.readLong();
+            this.dataUsedCapacityB = this.totalCapacityB - availableCapacityB;
+            this.diskAvailableCapacityB = availableCapacityB;
+        }
         this.state = DiskState.valueOf(Text.readString(in));
     }
 
diff --git a/fe/src/com/baidu/palo/common/FeConstants.java b/fe/src/com/baidu/palo/common/FeConstants.java
index 1ed0fd520cf0ee..e8d32b8c5db583 100644
--- a/fe/src/com/baidu/palo/common/FeConstants.java
+++ b/fe/src/com/baidu/palo/common/FeConstants.java
@@ -38,5 +38,5 @@ public class FeConstants {
 
     // general model
     // Current meta data version. Use this version to write journals and image
-    public static int meta_version = FeMetaVersion.VERSION_35;
+    public static int meta_version = FeMetaVersion.VERSION_36;
 }
diff --git a/fe/src/com/baidu/palo/common/FeMetaVersion.java b/fe/src/com/baidu/palo/common/FeMetaVersion.java
index 4a7a86357edc46..dc1d1e97504027 100644
--- a/fe/src/com/baidu/palo/common/FeMetaVersion.java
+++ b/fe/src/com/baidu/palo/common/FeMetaVersion.java
@@ -73,4 +73,7 @@ public final class FeMetaVersion {
     // to remove backend in cluster when drop backend or 
     // decommission in latest versions.
     public static final int VERSION_35= 35;
+
+    // persist diskAvailableCapacity
+    public static final int VERSION_36= 36;
 }
diff --git a/fe/src/com/baidu/palo/common/proc/BackendProcNode.java b/fe/src/com/baidu/palo/common/proc/BackendProcNode.java
index 12d081dc550b99..3a4dd6c0c5431f 100644
--- a/fe/src/com/baidu/palo/common/proc/BackendProcNode.java
+++ b/fe/src/com/baidu/palo/common/proc/BackendProcNode.java
@@ -30,7 +30,7 @@
 
 public class BackendProcNode implements ProcNodeInterface {
     public static final ImmutableList<String> TITLE_NAMES = new ImmutableList.Builder<String>()
-            .add("RootPath").add("TotalCapacity").add("AvailableCapacity").add("State")
+            .add("RootPath").add("TotalCapacity").add("DataUsedCapacity").add("DiskAvailableCapacity").add("State")
             .build();
 
     private Backend backend;
@@ -49,28 +49,38 @@ public ProcResult fetchResult() throws AnalysisException {
 
         for (String infoString : backend.getDiskInfosAsString()) {
             String[] infos = infoString.split("\\|");
-            Preconditions.checkState(infos.length == 4);
+            Preconditions.checkState(infos.length == 5);
 
             Pair<Double, String> totalUnitPair = DebugUtil.getByteUint(Long.valueOf(infos[1]));
-            Pair<Double, String> availableUnitPair = DebugUtil.getByteUint(Long.valueOf(infos[2]));
+            Pair<Double, String> dataUsedUnitPair = DebugUtil.getByteUint(Long.valueOf(infos[2]));
+            Pair<Double, String> diskAvailableUnitPair = DebugUtil.getByteUint(Long.valueOf(infos[3]));
 
             String readableTotalCapacity = DebugUtil.DECIMAL_FORMAT_SCALE_3.format(totalUnitPair.first) + " "
                     + totalUnitPair.second;
-            String readableAvailableCapacity = DebugUtil.DECIMAL_FORMAT_SCALE_3.format(availableUnitPair.first) + " "
-                    + availableUnitPair.second;
+            String readableDataUsedCapacity = DebugUtil.DECIMAL_FORMAT_SCALE_3.format(dataUsedUnitPair.first) + " "
+                    + dataUsedUnitPair.second;
+            String readableDiskAvailableCapacity = DebugUtil.DECIMAL_FORMAT_SCALE_3.format(
+                    diskAvailableUnitPair.first) + " " + diskAvailableUnitPair.second;
 
-            result.addRow(Lists.newArrayList(infos[0], readableTotalCapacity, readableAvailableCapacity, infos[3]));
+            result.addRow(Lists.newArrayList(infos[0], readableTotalCapacity, readableDataUsedCapacity,
+                  readableDiskAvailableCapacity, infos[4]));
         }
 
         long totalCapacityB = backend.getTotalCapacityB();
         Pair<Double, String> unitPair = DebugUtil.getByteUint(totalCapacityB);
         String readableTotalCapacity = DebugUtil.DECIMAL_FORMAT_SCALE_3.format(unitPair.first) + " " + unitPair.second;
 
-        long availableCapacityB = backend.getAvailableCapacityB();
-        unitPair = DebugUtil.getByteUint(availableCapacityB);
-        String readableAvailableCapacity = DebugUtil.DECIMAL_FORMAT_SCALE_3.format(unitPair.first) + " "
+        long dataUsedCapacityB = backend.getDataUsedCapacityB();
+        unitPair = DebugUtil.getByteUint(dataUsedCapacityB);
+        String readableDataUsedCapacity = DebugUtil.DECIMAL_FORMAT_SCALE_3.format(unitPair.first) + " "
                 + unitPair.second;
-        result.addRow(Lists.newArrayList("Total", readableTotalCapacity, readableAvailableCapacity, ""));
+
+        long diskAvailableCapacityB = backend.getAvailableCapacityB();
+        unitPair = DebugUtil.getByteUint(diskAvailableCapacityB);
+        String readableDiskAvailableCapacity = DebugUtil.DECIMAL_FORMAT_SCALE_3.format(unitPair.first) + " "
+                + unitPair.second;
+        result.addRow(Lists.newArrayList("Total", readableTotalCapacity, readableDataUsedCapacity,
+              readableDiskAvailableCapacity, ""));
 
         return result;
     }
diff --git a/fe/src/com/baidu/palo/system/Backend.java b/fe/src/com/baidu/palo/system/Backend.java
index c7cc40cd6f8017..efb61e3bb22fcc 100644
--- a/fe/src/com/baidu/palo/system/Backend.java
+++ b/fe/src/com/baidu/palo/system/Backend.java
@@ -276,7 +276,8 @@ public List<String> getDiskInfosAsString() {
         List<String> diskInfoStrings = new LinkedList<String>();
         for (DiskInfo diskInfo : disks.values()) {
             diskInfoStrings.add(diskInfo.getRootPath() + "|" + diskInfo.getTotalCapacityB() + "|"
-                    + diskInfo.getAvailableCapacityB() + "|" + diskInfo.getState().name());
+                    + diskInfo.getDataUsedCapacityB() + "|" + diskInfo.getAvailableCapacityB() + "|"
+                    + diskInfo.getState().name());
         }
         return diskInfoStrings;
     }
@@ -303,6 +304,17 @@ public long getAvailableCapacityB() {
         }
         return availableCapacityB;
     }
+
+    public long getDataUsedCapacityB() {
+        ImmutableMap<String, DiskInfo> disks = disksRef.get();
+        long dataUsedCapacityB = 0L;
+        for (DiskInfo diskInfo : disks.values()) {
+            if (diskInfo.getState() == DiskState.ONLINE) {
+                dataUsedCapacityB += diskInfo.getDataUsedCapacityB();
+            }
+        }
+        return dataUsedCapacityB;
+    }
 
     public void updateDisks(Map<String, TDisk> backendDisks) {
         // update status or add new diskInfo
@@ -310,8 +322,9 @@ public void updateDisks(Map<String, TDisk> backendDisks) {
         Map<String, DiskInfo> newDisks = Maps.newHashMap();
         for (TDisk tDisk : backendDisks.values()) {
             String rootPath = tDisk.getRoot_path();
-            long totalCapacityB = tDisk.getTotal_capacity();
-            long availableCapacityB = tDisk.getAvailable_capacity();
+            long totalCapacityB = tDisk.getDisk_total_capacity();
+            long dataUsedCapacityB = tDisk.getData_used_capacity();
+            long diskAvailableCapacityB = tDisk.getDisk_available_capacity();
             boolean isUsed = tDisk.isUsed();
 
             DiskInfo diskInfo = disks.get(rootPath);
@@ -322,7 +335,8 @@ public void updateDisks(Map<String, TDisk> backendDisks) {
             newDisks.put(rootPath, diskInfo);
 
             diskInfo.setTotalCapacityB(totalCapacityB);
-            diskInfo.setAvailableCapacityB(availableCapacityB);
+            diskInfo.setDataUsedCapacityB(dataUsedCapacityB);
+            diskInfo.setAvailableCapacityB(diskAvailableCapacityB);
             if (isUsed) {
                 diskInfo.setState(DiskState.ONLINE);
             } else {
diff --git a/gensrc/thrift/MasterService.thrift b/gensrc/thrift/MasterService.thrift
index 8333584c14bd9e..5ea9d7bee497aa 100644
--- a/gensrc/thrift/MasterService.thrift
+++ b/gensrc/thrift/MasterService.thrift
@@ -55,9 +55,10 @@ struct TTablet {
 
 struct TDisk {
     1: required string root_path
-    2: required Types.TSize total_capacity
-    3: required Types.TSize available_capacity
+    2: required Types.TSize disk_total_capacity
+    3: required Types.TSize data_used_capacity
     4: required bool used
+    5: optional Types.TSize disk_available_capacity
 }
 
 struct TReportRequest {

From cdf0caad77e82554c6955ebdfe86e7b18a6f6161 Mon Sep 17 00:00:00 2001
From: yubingpeng <yubingpenguestc@163.com>
Date: Fri, 15 Dec 2017 14:08:18 +0800
Subject: [PATCH 2/5] fix microseconds_add and microseconds_sub bug

---
 gensrc/script/palo_builtins_functions.py | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/gensrc/script/palo_builtins_functions.py b/gensrc/script/palo_builtins_functions.py
index 405fbb668b1d58..9678bb1fda1982 100644
--- a/gensrc/script/palo_builtins_functions.py
+++ b/gensrc/script/palo_builtins_functions.py
@@ -187,10 +187,10 @@
     [['seconds_sub'], 'DATETIME', ['DATETIME', 'INT'],
         '_ZN4palo18TimestampFunctions11seconds_subEPN8palo_udf'
         '15FunctionContextERKNS1_11DateTimeValERKNS1_6IntValE'],
-    [['microseconds_sub'], 'DATETIME', ['DATETIME', 'INT'],
+    [['microseconds_add'], 'DATETIME', ['DATETIME', 'INT'],
         '_ZN4palo18TimestampFunctions10micros_addEPN8palo_udf'
         '15FunctionContextERKNS1_11DateTimeValERKNS1_6IntValE'],
-    [['microseconds_sub'], 'DATETIME', ['DATETIME', 'BIGINT'],
+    [['microseconds_sub'], 'DATETIME', ['DATETIME', 'INT'],
         '_ZN4palo18TimestampFunctions10micros_subEPN8palo_udf'
         '15FunctionContextERKNS1_11DateTimeValERKNS1_6IntValE'],
 

From bc001f1fa3248b72fa60dfb8c6a70e3c3e1319a2 Mon Sep 17 00:00:00 2001
From: yubingpeng <yubingpenguestc@163.com>
Date: Mon, 18 Dec 2017 19:43:42 +0800
Subject: [PATCH 3/5] fix NULL cast to CHAR bug

---
 fe/src/com/baidu/palo/catalog/PrimitiveType.java | 1 +
 1 file changed, 1 insertion(+)

diff --git a/fe/src/com/baidu/palo/catalog/PrimitiveType.java b/fe/src/com/baidu/palo/catalog/PrimitiveType.java
index 094433214c1cd6..c71fbceb8fdf99 100644
--- a/fe/src/com/baidu/palo/catalog/PrimitiveType.java
+++ b/fe/src/com/baidu/palo/catalog/PrimitiveType.java
@@ -78,6 +78,7 @@ public enum PrimitiveType {
         builder.put(NULL_TYPE, DATE);
         builder.put(NULL_TYPE, DATETIME);
         builder.put(NULL_TYPE, DECIMAL);
+        builder.put(NULL_TYPE, CHAR);
         builder.put(NULL_TYPE, VARCHAR);
         // Boolean
         builder.put(BOOLEAN, BOOLEAN);

From fd96d7092f9e872cf7f127066a2a3917ec848ea1 Mon Sep 17 00:00:00 2001
From: yubingpeng <yubingpenguestc@163.com>
Date: Wed, 20 Dec 2017 19:53:53 +0800
Subject: [PATCH 4/5] fix multiple count_distinct bug

---
 fe/src/com/baidu/palo/analysis/FunctionCallExpr.java | 12 +++++++++++-
 1 file changed, 11 insertions(+), 1 deletion(-)

diff --git a/fe/src/com/baidu/palo/analysis/FunctionCallExpr.java b/fe/src/com/baidu/palo/analysis/FunctionCallExpr.java
index 883c944d682416..43a491f8021f11 100644
--- a/fe/src/com/baidu/palo/analysis/FunctionCallExpr.java
+++ b/fe/src/com/baidu/palo/analysis/FunctionCallExpr.java
@@ -435,7 +435,17 @@ public void analyzeImpl(Analyzer analyzer) throws AnalysisException {
             fn = getBuiltinFunction(analyzer, fnName.getFunction(), new Type[]{type},
                 Function.CompareMode.IS_NONSTRICT_SUPERTYPE_OF);
         }  else if (fnName.getFunction().equalsIgnoreCase("count_distinct")) {
-            fn = getBuiltinFunction(analyzer, fnName.getFunction(), new Type[]{children.get(0).getType()},
+            Type compatibleType = this.children.get(0).getType();
+            for (int i = 1; i < this.children.size(); ++i) {
+                Type type = this.children.get(i).getType();
+                compatibleType = Type.getAssignmentCompatibleType(compatibleType, type, true);
+                if (compatibleType.isInvalid()) {
+                    compatibleType = Type.VARCHAR;
+                    break;
+                }
+            }
+
+            fn = getBuiltinFunction(analyzer, fnName.getFunction(), new Type[]{compatibleType},
                     Function.CompareMode.IS_NONSTRICT_SUPERTYPE_OF);
         } else {
             fn = getBuiltinFunction(analyzer, fnName.getFunction(), collectChildReturnTypes(),

From 7b74d687a6185e662cfc75472174707fe889b201 Mon Sep 17 00:00:00 2001
From: yubingpeng <yubingpenguestc@163.com>
Date: Thu, 11 Jan 2018 14:14:36 +0800
Subject: [PATCH 5/5] add export retry

---
 be/src/runtime/export_sink.cpp                |  7 ++-
 fe/src/com/baidu/palo/load/ExportJob.java     | 36 ++++++++++-
 fe/src/com/baidu/palo/qe/Coordinator.java     | 15 +++++
 .../baidu/palo/task/ExportExportingTask.java  | 59 +++++++------------
 .../baidu/palo/task/ExportPendingTask.java    |  5 +-
 5 files changed, 77 insertions(+), 45 deletions(-)

diff --git a/be/src/runtime/export_sink.cpp b/be/src/runtime/export_sink.cpp
index c9fd3553d94df9..0c9d3cc3d0a28f 100644
--- a/be/src/runtime/export_sink.cpp
+++ b/be/src/runtime/export_sink.cpp
@@ -243,8 +243,13 @@ Status ExportSink::open_file_writer() {
 // TODO(lingbin): add some other info to file name, like partition
 std::string ExportSink::gen_file_name() {
     const TUniqueId& id = _state->fragment_instance_id();
+
+    struct timeval tv;
+    gettimeofday(&tv, NULL);
+
     std::stringstream file_name;
-    file_name << "export_data_" << id.hi << "_" << id.lo;
+    file_name << "export_data_" << id.hi << "_" << id.lo << "_" 
+            << (tv.tv_sec * 1000 + tv.tv_usec / 1000);
     return file_name.str();
 }
 
diff --git a/fe/src/com/baidu/palo/load/ExportJob.java b/fe/src/com/baidu/palo/load/ExportJob.java
index 05c1dbc10f6a01..817b1395101999 100644
--- a/fe/src/com/baidu/palo/load/ExportJob.java
+++ b/fe/src/com/baidu/palo/load/ExportJob.java
@@ -38,6 +38,7 @@
 import com.baidu.palo.common.io.Text;
 import com.baidu.palo.common.io.Writable;
 import com.baidu.palo.common.Pair;
+import com.baidu.palo.common.Status;
 import com.baidu.palo.common.util.TimeUtils;
 import com.baidu.palo.planner.DataPartition;
 import com.baidu.palo.planner.ExportSink;
@@ -49,7 +50,11 @@
 import com.baidu.palo.planner.PlanNodeId;
 import com.baidu.palo.planner.ScanNode;
 import com.baidu.palo.qe.Coordinator;
+import com.baidu.palo.system.Backend;
+import com.baidu.palo.task.AgentClient;
+import com.baidu.palo.thrift.TAgentResult;
 import com.baidu.palo.thrift.TNetworkAddress;
+import com.baidu.palo.thrift.TStatusCode;
 import com.baidu.palo.thrift.TScanRangeLocation;
 import com.baidu.palo.thrift.TScanRangeLocations;
 import com.baidu.palo.thrift.TUniqueId;
@@ -489,8 +494,8 @@ public List<Pair<TNetworkAddress, String>> getSnapshotPaths() {
         return this.snapshotPaths;
     }
 
-    public void setSnapshotPaths(List<Pair<TNetworkAddress, String>> snapshotPaths) {
-        this.snapshotPaths = snapshotPaths;
+    public void addSnapshotPath(Pair<TNetworkAddress, String> snapshotPath) {
+        this.snapshotPaths.add(snapshotPath);
     }
 
     public String getSql() {
@@ -502,6 +507,7 @@ public void setSql(String sql) {
     }
 
     public synchronized void cancel(ExportFailMsg.CancelType type, String msg) {
+        releaseSnapshotPaths();
         failMsg = new ExportFailMsg(type, msg);
         updateState(ExportJob.JobState.CANCELLED, false);
     }
@@ -534,6 +540,32 @@ public synchronized boolean updateState(ExportJob.JobState newState, boolean isR
         return true;
     }
 
+    public Status releaseSnapshotPaths() {
+        List<Pair<TNetworkAddress, String>> snapshotPaths = getSnapshotPaths();
+        LOG.debug("snapshotPaths:{}", snapshotPaths);
+        for (Pair<TNetworkAddress, String> snapshotPath : snapshotPaths) {
+            TNetworkAddress address = snapshotPath.first;
+            String host = address.getHostname();
+            int port = address.getPort();
+            Backend backend = Catalog.getCurrentSystemInfo().getBackendWithBePort(host, port);
+            if (backend == null) {
+                continue;
+            }
+            long backendId = backend.getId();
+            if (!Catalog.getCurrentSystemInfo().checkBackendAvailable(backendId)) {
+                continue;
+            }
+
+            AgentClient client = new AgentClient(host, port);
+            TAgentResult result = client.releaseSnapshot(snapshotPath.second);
+            if (result == null || result.getStatus().getStatus_code() != TStatusCode.OK) {
+                continue;
+            }
+        }
+        snapshotPaths.clear();
+        return Status.OK;
+    }
+
     @Override
     public String toString() {
         return "ExportJob [jobId=" + id
diff --git a/fe/src/com/baidu/palo/qe/Coordinator.java b/fe/src/com/baidu/palo/qe/Coordinator.java
index fb3e0b1db24f9a..c809307a7d2ed6 100644
--- a/fe/src/com/baidu/palo/qe/Coordinator.java
+++ b/fe/src/com/baidu/palo/qe/Coordinator.java
@@ -235,6 +235,21 @@ public void setTimeout(int timeout) {
         this.queryOptions.setQuery_timeout(timeout);
     }
 
+    public void clearExportStatus() {
+        lock.lock();
+        try {
+            this.backendExecStates.clear();
+            this.backendExecStateMap.clear();
+            this.queryStatus.setStatus(new Status());
+            if (this.exportFiles == null) {
+                this.exportFiles = Lists.newArrayList();
+            }
+            this.exportFiles.clear();
+        } finally {
+            lock.unlock();
+        }
+    }
+
     // Initiate
     private void prepare() {
         for (PlanFragment fragment : fragments) {
diff --git a/fe/src/com/baidu/palo/task/ExportExportingTask.java b/fe/src/com/baidu/palo/task/ExportExportingTask.java
index ee438ba60ac608..69213b43d3f4a0 100644
--- a/fe/src/com/baidu/palo/task/ExportExportingTask.java
+++ b/fe/src/com/baidu/palo/task/ExportExportingTask.java
@@ -54,6 +54,7 @@
 
 public class ExportExportingTask extends MasterTask {
     private static final Logger LOG = LogManager.getLogger(ExportExportingTask.class);
+    private static final int RETRY_NUM = 3;
 
     protected final ExportJob job;
 
@@ -92,7 +93,6 @@ protected void exec() {
         }
 
         // if one instance finished, we send request to BE to exec next instance
-        // TODO(lingbin): add retry sending logic if send fail
         List<Coordinator> coords = job.getCoordList();
         int coordSize = coords.size();
         for (int i = 0; i < coordSize; i++) {
@@ -100,7 +100,19 @@ protected void exec() {
                 break;
             }
             Coordinator coord = coords.get(i);
-            execOneCoord(coord);
+            for (int j = 0; j < RETRY_NUM; ++j) {
+                execOneCoord(coord);
+                if (coord.getExecStatus().ok()) {
+                    break;
+                }
+                if (j < RETRY_NUM - 1) {
+                    coord.clearExportStatus();
+                    LOG.info("export exporting job fail. job: {}. Retry.", job);
+                }
+            }
+            if (!coord.getExecStatus().ok()) {
+                onFailed(coord.getExecStatus());
+            }
             int progress = (int) (i + 1) * 100 / coordSize;
             if (progress >= 100) {
                 progress = 99;
@@ -122,7 +134,7 @@ protected void exec() {
         }
 
         // release snapshot
-        Status releaseSnapshotStatus = releaseSnapshotPaths();
+        Status releaseSnapshotStatus = job.releaseSnapshotPaths();
         if (!releaseSnapshotStatus.ok()) {
             String failMsg = "release snapshot fail.";
             failMsg += releaseSnapshotStatus.getErrorMsg();
@@ -161,7 +173,7 @@ private Status execOneCoord(Coordinator coord) {
             needUnregister = true;
             actualExecCoord(queryId, coord);
         } catch (InternalException e) {
-            onFailed(new Status(TStatusCode.INTERNAL_ERROR, e.getMessage()));
+            LOG.warn("export exporting internal error. {}", e.getMessage());
         } finally {
             if (needUnregister) {
                 QeProcessor.unregisterQuery(queryId);
@@ -181,22 +193,20 @@ private void actualExecCoord(TUniqueId queryId, Coordinator coord) {
         try {
             coord.exec();
         } catch (Exception e) {
-            onFailed(new Status(TStatusCode.INTERNAL_ERROR, "export Coordinator execute failed."));
+            LOG.warn("export Coordinator execute failed.");
         }
 
         if (coord.join(waitSecond)) {
             Status status = coord.getExecStatus();
             if (status.ok()) {
-                onFinished(coord.getExportFiles());
-            } else {
-                onFailed(status);
+                onSubTaskFinished(coord.getExportFiles());
             }
         } else {
-            onTimeout();
+            coord.cancel();
         }
     }
 
-    private synchronized void onFinished(List<String> exportFiles) {
+    private synchronized void onSubTaskFinished(List<String> exportFiles) {
         job.addExportedFiles(exportFiles);
     }
 
@@ -206,7 +216,7 @@ private synchronized void onFailed(Status failStatus) {
         cancelType = ExportFailMsg.CancelType.RUN_FAIL;
         String failMsg = "export exporting job fail. ";
         failMsg += failStatus.getErrorMsg();
-        job.cancel(cancelType, failMsg);
+        job.setFailMsg(new ExportFailMsg(cancelType, failMsg));
         LOG.warn("export exporting job fail. job: {}", job);
     }
 
@@ -215,7 +225,6 @@ public synchronized void onTimeout() {
         this.failStatus = new Status(TStatusCode.TIMEOUT, "timeout");
         cancelType = ExportFailMsg.CancelType.TIMEOUT;
         String failMsg = "export exporting job timeout";
-        job.cancel(cancelType, failMsg);
         LOG.warn("export exporting job timeout. job: {}", job);
     }
 
@@ -248,32 +257,6 @@ private void registerProfile() {
         ProfileManager.getInstance().pushProfile(profile);
     }
 
-    private Status releaseSnapshotPaths() {
-        List<Pair<TNetworkAddress, String>> snapshotPaths = job.getSnapshotPaths();
-        LOG.debug("snapshotPaths:{}", snapshotPaths);
-        for (Pair<TNetworkAddress, String> snapshotPath : snapshotPaths) {
-            TNetworkAddress address = snapshotPath.first;
-            String host = address.getHostname();
-            int port = address.getPort();
-            Backend backend = Catalog.getCurrentSystemInfo().getBackendWithBePort(host, port);
-            if (backend == null) {
-                return Status.CANCELLED;
-            }
-            long backendId = backend.getId();
-            if (!Catalog.getCurrentSystemInfo().checkBackendAvailable(backendId)) {
-                return Status.CANCELLED;
-            }
-
-            AgentClient client = new AgentClient(host, port);
-            TAgentResult result = client.releaseSnapshot(snapshotPath.second);
-            if (result.getStatus().getStatus_code() != TStatusCode.OK) {
-                return Status.CANCELLED;
-            }
-        }
-        snapshotPaths.clear();
-        return Status.OK;
-    }
-
     private Status moveTmpFiles() {
         BrokerMgr.BrokerAddress brokerAddress = null;
         try {
diff --git a/fe/src/com/baidu/palo/task/ExportPendingTask.java b/fe/src/com/baidu/palo/task/ExportPendingTask.java
index 40b883833d28da..0cdf9305a702b4 100644
--- a/fe/src/com/baidu/palo/task/ExportPendingTask.java
+++ b/fe/src/com/baidu/palo/task/ExportPendingTask.java
@@ -74,7 +74,6 @@ protected void exec() {
 
         // make snapshots
         Status snapshotStatus = makeSnapshots();
-        // TODO(pengyubing): if export job fail, release snapshot
         if (!snapshotStatus.ok()) {
             String failMsg = "make snapshot failed.";
             failMsg += snapshotStatus.getErrorMsg();
@@ -94,7 +93,6 @@ private Status makeSnapshots() {
         if (tabletLocations == null) {
             return Status.OK;
         }
-        List<Pair<TNetworkAddress, String>> snapshotPaths = Lists.newArrayList();
         for (TScanRangeLocations tablet : tabletLocations) {
             TScanRange scanRange = tablet.getScan_range();
             if (!scanRange.isSetPalo_scan_range()) {
@@ -125,11 +123,10 @@ private Status makeSnapshots() {
                 if (result == null || result.getStatus().getStatus_code() != TStatusCode.OK) {
                     return Status.CANCELLED;
                 }
-                snapshotPaths.add(new Pair<TNetworkAddress, String>(address, result.getSnapshot_path()));
+                job.addSnapshotPath(new Pair<TNetworkAddress, String>(address, result.getSnapshot_path()));
                 LOG.debug("snapshot address:{}, path:{}", address, result.getSnapshot_path());
             }
         }
-        job.setSnapshotPaths(snapshotPaths);
         return Status.OK;
     }
 }