diff --git a/be/src/runtime/export_sink.cpp b/be/src/runtime/export_sink.cpp index c9fd3553d94df9..0c9d3cc3d0a28f 100644 --- a/be/src/runtime/export_sink.cpp +++ b/be/src/runtime/export_sink.cpp @@ -243,8 +243,13 @@ Status ExportSink::open_file_writer() { // TODO(lingbin): add some other info to file name, like partition std::string ExportSink::gen_file_name() { const TUniqueId& id = _state->fragment_instance_id(); + + struct timeval tv; + gettimeofday(&tv, NULL); + std::stringstream file_name; - file_name << "export_data_" << id.hi << "_" << id.lo; + file_name << "export_data_" << id.hi << "_" << id.lo << "_" + << (tv.tv_sec * 1000 + tv.tv_usec / 1000); return file_name.str(); } diff --git a/fe/src/com/baidu/palo/load/ExportJob.java b/fe/src/com/baidu/palo/load/ExportJob.java index 05c1dbc10f6a01..817b1395101999 100644 --- a/fe/src/com/baidu/palo/load/ExportJob.java +++ b/fe/src/com/baidu/palo/load/ExportJob.java @@ -38,6 +38,7 @@ import com.baidu.palo.common.io.Text; import com.baidu.palo.common.io.Writable; import com.baidu.palo.common.Pair; +import com.baidu.palo.common.Status; import com.baidu.palo.common.util.TimeUtils; import com.baidu.palo.planner.DataPartition; import com.baidu.palo.planner.ExportSink; @@ -49,7 +50,11 @@ import com.baidu.palo.planner.PlanNodeId; import com.baidu.palo.planner.ScanNode; import com.baidu.palo.qe.Coordinator; +import com.baidu.palo.system.Backend; +import com.baidu.palo.task.AgentClient; +import com.baidu.palo.thrift.TAgentResult; import com.baidu.palo.thrift.TNetworkAddress; +import com.baidu.palo.thrift.TStatusCode; import com.baidu.palo.thrift.TScanRangeLocation; import com.baidu.palo.thrift.TScanRangeLocations; import com.baidu.palo.thrift.TUniqueId; @@ -489,8 +494,8 @@ public List> getSnapshotPaths() { return this.snapshotPaths; } - public void setSnapshotPaths(List> snapshotPaths) { - this.snapshotPaths = snapshotPaths; + public void addSnapshotPath(Pair snapshotPath) { + this.snapshotPaths.add(snapshotPath); } public String getSql() { @@ -502,6 +507,7 @@ public void setSql(String sql) { } public synchronized void cancel(ExportFailMsg.CancelType type, String msg) { + releaseSnapshotPaths(); failMsg = new ExportFailMsg(type, msg); updateState(ExportJob.JobState.CANCELLED, false); } @@ -534,6 +540,32 @@ public synchronized boolean updateState(ExportJob.JobState newState, boolean isR return true; } + public Status releaseSnapshotPaths() { + List> snapshotPaths = getSnapshotPaths(); + LOG.debug("snapshotPaths:{}", snapshotPaths); + for (Pair snapshotPath : snapshotPaths) { + TNetworkAddress address = snapshotPath.first; + String host = address.getHostname(); + int port = address.getPort(); + Backend backend = Catalog.getCurrentSystemInfo().getBackendWithBePort(host, port); + if (backend == null) { + continue; + } + long backendId = backend.getId(); + if (!Catalog.getCurrentSystemInfo().checkBackendAvailable(backendId)) { + continue; + } + + AgentClient client = new AgentClient(host, port); + TAgentResult result = client.releaseSnapshot(snapshotPath.second); + if (result == null || result.getStatus().getStatus_code() != TStatusCode.OK) { + continue; + } + } + snapshotPaths.clear(); + return Status.OK; + } + @Override public String toString() { return "ExportJob [jobId=" + id diff --git a/fe/src/com/baidu/palo/qe/Coordinator.java b/fe/src/com/baidu/palo/qe/Coordinator.java index fb3e0b1db24f9a..c809307a7d2ed6 100644 --- a/fe/src/com/baidu/palo/qe/Coordinator.java +++ b/fe/src/com/baidu/palo/qe/Coordinator.java @@ -235,6 +235,21 @@ public void setTimeout(int timeout) { this.queryOptions.setQuery_timeout(timeout); } + public void clearExportStatus() { + lock.lock(); + try { + this.backendExecStates.clear(); + this.backendExecStateMap.clear(); + this.queryStatus.setStatus(new Status()); + if (this.exportFiles == null) { + this.exportFiles = Lists.newArrayList(); + } + this.exportFiles.clear(); + } finally { + lock.unlock(); + } + } + // Initiate private void prepare() { for (PlanFragment fragment : fragments) { diff --git a/fe/src/com/baidu/palo/task/ExportExportingTask.java b/fe/src/com/baidu/palo/task/ExportExportingTask.java index ee438ba60ac608..69213b43d3f4a0 100644 --- a/fe/src/com/baidu/palo/task/ExportExportingTask.java +++ b/fe/src/com/baidu/palo/task/ExportExportingTask.java @@ -54,6 +54,7 @@ public class ExportExportingTask extends MasterTask { private static final Logger LOG = LogManager.getLogger(ExportExportingTask.class); + private static final int RETRY_NUM = 3; protected final ExportJob job; @@ -92,7 +93,6 @@ protected void exec() { } // if one instance finished, we send request to BE to exec next instance - // TODO(lingbin): add retry sending logic if send fail List coords = job.getCoordList(); int coordSize = coords.size(); for (int i = 0; i < coordSize; i++) { @@ -100,7 +100,19 @@ protected void exec() { break; } Coordinator coord = coords.get(i); - execOneCoord(coord); + for (int j = 0; j < RETRY_NUM; ++j) { + execOneCoord(coord); + if (coord.getExecStatus().ok()) { + break; + } + if (j < RETRY_NUM - 1) { + coord.clearExportStatus(); + LOG.info("export exporting job fail. job: {}. Retry.", job); + } + } + if (!coord.getExecStatus().ok()) { + onFailed(coord.getExecStatus()); + } int progress = (int) (i + 1) * 100 / coordSize; if (progress >= 100) { progress = 99; @@ -122,7 +134,7 @@ protected void exec() { } // release snapshot - Status releaseSnapshotStatus = releaseSnapshotPaths(); + Status releaseSnapshotStatus = job.releaseSnapshotPaths(); if (!releaseSnapshotStatus.ok()) { String failMsg = "release snapshot fail."; failMsg += releaseSnapshotStatus.getErrorMsg(); @@ -161,7 +173,7 @@ private Status execOneCoord(Coordinator coord) { needUnregister = true; actualExecCoord(queryId, coord); } catch (InternalException e) { - onFailed(new Status(TStatusCode.INTERNAL_ERROR, e.getMessage())); + LOG.warn("export exporting internal error. {}", e.getMessage()); } finally { if (needUnregister) { QeProcessor.unregisterQuery(queryId); @@ -181,22 +193,20 @@ private void actualExecCoord(TUniqueId queryId, Coordinator coord) { try { coord.exec(); } catch (Exception e) { - onFailed(new Status(TStatusCode.INTERNAL_ERROR, "export Coordinator execute failed.")); + LOG.warn("export Coordinator execute failed."); } if (coord.join(waitSecond)) { Status status = coord.getExecStatus(); if (status.ok()) { - onFinished(coord.getExportFiles()); - } else { - onFailed(status); + onSubTaskFinished(coord.getExportFiles()); } } else { - onTimeout(); + coord.cancel(); } } - private synchronized void onFinished(List exportFiles) { + private synchronized void onSubTaskFinished(List exportFiles) { job.addExportedFiles(exportFiles); } @@ -206,7 +216,7 @@ private synchronized void onFailed(Status failStatus) { cancelType = ExportFailMsg.CancelType.RUN_FAIL; String failMsg = "export exporting job fail. "; failMsg += failStatus.getErrorMsg(); - job.cancel(cancelType, failMsg); + job.setFailMsg(new ExportFailMsg(cancelType, failMsg)); LOG.warn("export exporting job fail. job: {}", job); } @@ -215,7 +225,6 @@ public synchronized void onTimeout() { this.failStatus = new Status(TStatusCode.TIMEOUT, "timeout"); cancelType = ExportFailMsg.CancelType.TIMEOUT; String failMsg = "export exporting job timeout"; - job.cancel(cancelType, failMsg); LOG.warn("export exporting job timeout. job: {}", job); } @@ -248,32 +257,6 @@ private void registerProfile() { ProfileManager.getInstance().pushProfile(profile); } - private Status releaseSnapshotPaths() { - List> snapshotPaths = job.getSnapshotPaths(); - LOG.debug("snapshotPaths:{}", snapshotPaths); - for (Pair snapshotPath : snapshotPaths) { - TNetworkAddress address = snapshotPath.first; - String host = address.getHostname(); - int port = address.getPort(); - Backend backend = Catalog.getCurrentSystemInfo().getBackendWithBePort(host, port); - if (backend == null) { - return Status.CANCELLED; - } - long backendId = backend.getId(); - if (!Catalog.getCurrentSystemInfo().checkBackendAvailable(backendId)) { - return Status.CANCELLED; - } - - AgentClient client = new AgentClient(host, port); - TAgentResult result = client.releaseSnapshot(snapshotPath.second); - if (result.getStatus().getStatus_code() != TStatusCode.OK) { - return Status.CANCELLED; - } - } - snapshotPaths.clear(); - return Status.OK; - } - private Status moveTmpFiles() { BrokerMgr.BrokerAddress brokerAddress = null; try { diff --git a/fe/src/com/baidu/palo/task/ExportPendingTask.java b/fe/src/com/baidu/palo/task/ExportPendingTask.java index 40b883833d28da..0cdf9305a702b4 100644 --- a/fe/src/com/baidu/palo/task/ExportPendingTask.java +++ b/fe/src/com/baidu/palo/task/ExportPendingTask.java @@ -74,7 +74,6 @@ protected void exec() { // make snapshots Status snapshotStatus = makeSnapshots(); - // TODO(pengyubing): if export job fail, release snapshot if (!snapshotStatus.ok()) { String failMsg = "make snapshot failed."; failMsg += snapshotStatus.getErrorMsg(); @@ -94,7 +93,6 @@ private Status makeSnapshots() { if (tabletLocations == null) { return Status.OK; } - List> snapshotPaths = Lists.newArrayList(); for (TScanRangeLocations tablet : tabletLocations) { TScanRange scanRange = tablet.getScan_range(); if (!scanRange.isSetPalo_scan_range()) { @@ -125,11 +123,10 @@ private Status makeSnapshots() { if (result == null || result.getStatus().getStatus_code() != TStatusCode.OK) { return Status.CANCELLED; } - snapshotPaths.add(new Pair(address, result.getSnapshot_path())); + job.addSnapshotPath(new Pair(address, result.getSnapshot_path())); LOG.debug("snapshot address:{}, path:{}", address, result.getSnapshot_path()); } } - job.setSnapshotPaths(snapshotPaths); return Status.OK; } }