From 3abd13b0cf77e78dc8c1b37280053979afef9064 Mon Sep 17 00:00:00 2001 From: Jishnu J Date: Thu, 9 Jan 2025 15:47:00 +0530 Subject: [PATCH 1/3] Initial commit --- data-loader/build.gradle | 1 + .../dataimport/log/AbstractImportLogger.java | 169 +++++++++++ .../dataimport/log/ImportLoggerConfig.java | 13 + .../dataimport/log/ImportLoggerException.java | 12 + .../core/dataimport/log/LogConstants.java | 3 + .../dataimport/log/LogStorageLocation.java | 7 + .../log/SingleFileImportLogger.java | 139 +++++++++ .../log/SplitByDataChunkImportLogger.java | 187 ++++++++++++ .../dataimport/log/writer/AwsS3LogWriter.java | 29 ++ .../log/writer/DefaultLogWriterFactory.java | 36 +++ .../log/writer/LocalFileLogWriter.java | 62 ++++ .../dataimport/log/writer/LogFileType.java | 8 + .../core/dataimport/log/writer/LogWriter.java | 14 + .../log/writer/LogWriterFactory.java | 8 + .../log/writer/LogWriterFactoryConfig.java | 15 + .../log/SingleFileImportLoggerTest.java | 268 ++++++++++++++++++ .../log/SplitByDataChunkImportLoggerTest.java | 239 ++++++++++++++++ .../writer/DefaultLogWriterFactoryTest.java | 66 +++++ 18 files changed, 1276 insertions(+) create mode 100644 data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/AbstractImportLogger.java create mode 100644 data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/ImportLoggerConfig.java create mode 100644 data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/ImportLoggerException.java create mode 100644 data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/LogConstants.java create mode 100644 data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/LogStorageLocation.java create mode 100644 data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/SingleFileImportLogger.java create mode 100644 data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/SplitByDataChunkImportLogger.java create mode 100644 data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/writer/AwsS3LogWriter.java create mode 100644 data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/writer/DefaultLogWriterFactory.java create mode 100644 data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/writer/LocalFileLogWriter.java create mode 100644 data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/writer/LogFileType.java create mode 100644 data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/writer/LogWriter.java create mode 100644 data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/writer/LogWriterFactory.java create mode 100644 data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/writer/LogWriterFactoryConfig.java create mode 100644 data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/log/SingleFileImportLoggerTest.java create mode 100644 data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/log/SplitByDataChunkImportLoggerTest.java create mode 100644 data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/log/writer/DefaultLogWriterFactoryTest.java diff --git a/data-loader/build.gradle b/data-loader/build.gradle index 87a057933..836151e92 100644 --- a/data-loader/build.gradle +++ b/data-loader/build.gradle @@ -17,6 +17,7 @@ subprojects { implementation("org.apache.commons:commons-lang3:${commonsLangVersion}") implementation("commons-io:commons-io:${commonsIoVersion}") implementation("org.slf4j:slf4j-simple:${slf4jVersion}") + implementation("software.amazon.awssdk:s3:2.25.31") // Mockito testImplementation "org.mockito:mockito-core:${mockitoVersion}" diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/AbstractImportLogger.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/AbstractImportLogger.java new file mode 100644 index 000000000..11a7493ca --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/AbstractImportLogger.java @@ -0,0 +1,169 @@ +package com.scalar.db.dataloader.core.dataimport.log; + +import com.fasterxml.jackson.databind.JsonNode; +import com.scalar.db.dataloader.core.Constants; +import com.scalar.db.dataloader.core.DataLoaderObjectMapper; +import com.scalar.db.dataloader.core.dataimport.ImportEventListener; +import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunkStatus; +import com.scalar.db.dataloader.core.dataimport.log.writer.LogWriter; +import com.scalar.db.dataloader.core.dataimport.log.writer.LogWriterFactory; +import com.scalar.db.dataloader.core.dataimport.task.result.ImportTargetResult; +import com.scalar.db.dataloader.core.dataimport.task.result.ImportTargetResultStatus; +import com.scalar.db.dataloader.core.dataimport.task.result.ImportTaskResult; +import com.scalar.db.dataloader.core.dataimport.transactionbatch.ImportTransactionBatchResult; +import com.scalar.db.dataloader.core.dataimport.transactionbatch.ImportTransactionBatchStatus; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import lombok.RequiredArgsConstructor; + +@RequiredArgsConstructor +public abstract class AbstractImportLogger implements ImportEventListener { + + protected static final DataLoaderObjectMapper OBJECT_MAPPER = new DataLoaderObjectMapper(); + + protected final ImportLoggerConfig config; + protected final LogWriterFactory logWriterFactory; + protected final List listeners = new ArrayList<>(); + + public void addListener(ImportEventListener listener) { + listeners.add(listener); + } + + public void removeListener(ImportEventListener listener) { + listeners.remove(listener); + } + + @Override + public void onDataChunkStarted(ImportDataChunkStatus importDataChunkStatus) { + // Currently we are not logging the start of a data chunk + } + + @Override + public void onTransactionBatchStarted(ImportTransactionBatchStatus batchStatus) { + // Currently we are not logging the start of a transaction batch + notifyTransactionBatchStarted(batchStatus); + } + + @Override + public void onTransactionBatchCompleted(ImportTransactionBatchResult batchResult) { + // skip logging success records if the configuration is set to skip + if (shouldSkipLoggingSuccess(batchResult)) { + return; + } + + logTransactionBatch(batchResult); + notifyTransactionBatchCompleted(batchResult); + } + + @Override + public void onTaskComplete(ImportTaskResult taskResult) { + // TODO: we can remove this event if it's current not being used in the import Manager as well + } + + protected abstract void logTransactionBatch(ImportTransactionBatchResult batchResult); + + protected boolean shouldSkipLoggingSuccess(ImportTransactionBatchResult batchResult) { + return batchResult.isSuccess() && !config.isLogSuccessRecords(); + } + + protected JsonNode createFilteredTransactionBatchLogJsonNode( + ImportTransactionBatchResult batchResult) { + + // If the batch result does not contain any records, return the batch result as is + if (batchResult.getRecords() == null) { + return OBJECT_MAPPER.valueToTree(batchResult); + } + + // Create a new list to store the modified import task results + List modifiedRecords = new ArrayList<>(); + + // Loop over the records in the batchResult + for (ImportTaskResult taskResult : batchResult.getRecords()) { + // Create a new ImportTaskResult and not add the raw record yet + List targetResults = + batchResult.isSuccess() + ? taskResult.getTargets() + : updateTargetStatusForAbortedTransactionBatch(taskResult.getTargets()); + ImportTaskResult.ImportTaskResultBuilder builder = + ImportTaskResult.builder() + .rowNumber(taskResult.getRowNumber()) + .targets(targetResults) + .dataChunkId(taskResult.getDataChunkId()) + .rowNumber(taskResult.getRowNumber()); + + // Only add the raw record if the configuration is set to log raw source data + if (config.isLogRawSourceRecords()) { + builder.rawRecord(taskResult.getRawRecord()); + } + ImportTaskResult modifiedTaskResult = builder.build(); + + // Add the modified task result to the list + modifiedRecords.add(modifiedTaskResult); + } + + // Create a new transaction batch result with the modified import task results + ImportTransactionBatchResult modifiedBatchResult = + ImportTransactionBatchResult.builder() + .dataChunkId(batchResult.getDataChunkId()) + .transactionBatchId(batchResult.getTransactionBatchId()) + .transactionId(batchResult.getTransactionId()) + .records(modifiedRecords) + .errors(batchResult.getErrors()) + .success(batchResult.isSuccess()) + .build(); + + // Convert the modified batch result to a JsonNode + return OBJECT_MAPPER.valueToTree(modifiedBatchResult); + } + + protected void closeLogWriter(LogWriter logWriter) { + if (logWriter != null) { + try { + logWriter.close(); + } catch (IOException e) { + logError("Failed to close a log writer", e); + } + } + } + + protected abstract void logError(String errorMessage, Exception e); + + protected LogWriter createLogWriter(String logFilePath) throws IOException { + return logWriterFactory.createLogWriter(logFilePath); + } + + private void notifyTransactionBatchStarted(ImportTransactionBatchStatus status) { + for (ImportEventListener listener : listeners) { + listener.onTransactionBatchStarted(status); + } + } + + private void notifyTransactionBatchCompleted(ImportTransactionBatchResult batchResult) { + for (ImportEventListener listener : listeners) { + listener.onTransactionBatchCompleted(batchResult); + } + } + + private List updateTargetStatusForAbortedTransactionBatch( + List targetResults) { + for (int i = 0; i < targetResults.size(); i++) { + ImportTargetResult target = targetResults.get(i); + if (target.getStatus().equals(ImportTargetResultStatus.SAVED)) { + ImportTargetResult newTarget = + ImportTargetResult.builder() + .importAction(target.getImportAction()) + .status(ImportTargetResultStatus.ABORTED) + .importedRecord(target.getImportedRecord()) + .namespace(target.getNamespace()) + .tableName(target.getTableName()) + .dataMapped(target.isDataMapped()) + .errors(Collections.singletonList(Constants.ABORT_TRANSACTION_STATUS)) + .build(); + targetResults.set(i, newTarget); + } + } + return targetResults; + } +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/ImportLoggerConfig.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/ImportLoggerConfig.java new file mode 100644 index 000000000..fc0039bf9 --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/ImportLoggerConfig.java @@ -0,0 +1,13 @@ +package com.scalar.db.dataloader.core.dataimport.log; + +import lombok.Builder; +import lombok.Value; + +@Value +@Builder +public class ImportLoggerConfig { + String logDirectoryPath; + boolean logSuccessRecords; + boolean logRawSourceRecords; + boolean prettyPrint; +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/ImportLoggerException.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/ImportLoggerException.java new file mode 100644 index 000000000..52424c997 --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/ImportLoggerException.java @@ -0,0 +1,12 @@ +package com.scalar.db.dataloader.core.dataimport.log; + +public class ImportLoggerException extends Exception { + + public ImportLoggerException(String message) { + super(message); + } + + public ImportLoggerException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/LogConstants.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/LogConstants.java new file mode 100644 index 000000000..379896bba --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/LogConstants.java @@ -0,0 +1,3 @@ +package com.scalar.db.dataloader.core.dataimport.log; + +public class LogConstants {} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/LogStorageLocation.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/LogStorageLocation.java new file mode 100644 index 000000000..396cb3d8e --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/LogStorageLocation.java @@ -0,0 +1,7 @@ +package com.scalar.db.dataloader.core.dataimport.log; + +/** The location where the logs are stored. */ +public enum LogStorageLocation { + LOCAL_FILE_STORAGE, + AWS_S3 +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/SingleFileImportLogger.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/SingleFileImportLogger.java new file mode 100644 index 000000000..fc7077076 --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/SingleFileImportLogger.java @@ -0,0 +1,139 @@ +package com.scalar.db.dataloader.core.dataimport.log; + +import com.fasterxml.jackson.databind.JsonNode; +import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunkStatus; +import com.scalar.db.dataloader.core.dataimport.log.writer.LogWriter; +import com.scalar.db.dataloader.core.dataimport.log.writer.LogWriterFactory; +import com.scalar.db.dataloader.core.dataimport.task.result.ImportTargetResult; +import com.scalar.db.dataloader.core.dataimport.task.result.ImportTargetResultStatus; +import com.scalar.db.dataloader.core.dataimport.task.result.ImportTaskResult; +import com.scalar.db.dataloader.core.dataimport.transactionbatch.ImportTransactionBatchResult; +import java.io.IOException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SingleFileImportLogger extends AbstractImportLogger { + + protected static final String SUMMARY_LOG_FILE_NAME = "summary.log"; + protected static final String SUCCESS_LOG_FILE_NAME = "success.json"; + protected static final String FAILURE_LOG_FILE_NAME = "failure.json"; + private static final Logger LOGGER = LoggerFactory.getLogger(SingleFileImportLogger.class); + private LogWriter summaryLogWriter; + private LogWriter successLogWriter; + private LogWriter failureLogWriter; + + public SingleFileImportLogger(ImportLoggerConfig config, LogWriterFactory logWriterFactory) + throws IOException { + super(config, logWriterFactory); + successLogWriter = createLogWriter(config.getLogDirectoryPath() + SUCCESS_LOG_FILE_NAME); + failureLogWriter = createLogWriter(config.getLogDirectoryPath() + FAILURE_LOG_FILE_NAME); + } + + @Override + public void onTaskComplete(ImportTaskResult taskResult) { + if (!config.isLogSuccessRecords() && !config.isLogRawSourceRecords()) return; + try { + writeImportTaskResultDetailToLogs(taskResult); + } catch (Exception e) { + logError("Failed to write success/failure logs", e); + } + } + + @Override + public void addOrUpdateDataChunkStatus(ImportDataChunkStatus status) {} + + @Override + public void onDataChunkCompleted(ImportDataChunkStatus dataChunkStatus) { + try { + logDataChunkSummary(dataChunkStatus); + } catch (IOException e) { + logError("Failed to log the data chunk summary", e); + } + } + + @Override + public void onAllDataChunksCompleted() { + closeAllLogWriters(); + } + + @Override + protected void logTransactionBatch(ImportTransactionBatchResult batchResult) { + try { + LogWriter logWriter = getLogWriterForTransactionBatch(batchResult); + JsonNode jsonNode = createFilteredTransactionBatchLogJsonNode(batchResult); + writeToLogWriter(logWriter, jsonNode); + } catch (IOException e) { + logError("Failed to write a transaction batch record to the log file", e); + } + } + + @Override + protected void logError(String errorMessage, Exception exception) { + LOGGER.error(errorMessage, exception); + } + + private void logDataChunkSummary(ImportDataChunkStatus dataChunkStatus) throws IOException { + if (summaryLogWriter == null) { + summaryLogWriter = createLogWriter(config.getLogDirectoryPath() + SUMMARY_LOG_FILE_NAME); + } + writeImportDataChunkSummary(dataChunkStatus, summaryLogWriter); + } + + private void writeImportDataChunkSummary( + ImportDataChunkStatus dataChunkStatus, LogWriter logWriter) throws IOException { + JsonNode jsonNode = OBJECT_MAPPER.valueToTree(dataChunkStatus); + writeToLogWriter(logWriter, jsonNode); + } + + private LogWriter getLogWriterForTransactionBatch(ImportTransactionBatchResult batchResult) + throws IOException { + String logFileName = batchResult.isSuccess() ? SUCCESS_LOG_FILE_NAME : FAILURE_LOG_FILE_NAME; + LogWriter logWriter = batchResult.isSuccess() ? successLogWriter : failureLogWriter; + if (logWriter == null) { + logWriter = createLogWriter(config.getLogDirectoryPath() + logFileName); + if (batchResult.isSuccess()) { + successLogWriter = logWriter; + } else { + failureLogWriter = logWriter; + } + } + return logWriter; + } + + private void writeImportTaskResultDetailToLogs(ImportTaskResult importTaskResult) + throws IOException { + JsonNode jsonNode; + for (ImportTargetResult target : importTaskResult.getTargets()) { + if (config.isLogSuccessRecords() + && target.getStatus().equals(ImportTargetResultStatus.SAVED)) { + synchronized (successLogWriter) { + jsonNode = OBJECT_MAPPER.valueToTree(target); + successLogWriter.write(jsonNode); + successLogWriter.flush(); + } + } + if (config.isLogRawSourceRecords() + && !target.getStatus().equals(ImportTargetResultStatus.SAVED)) { + synchronized (failureLogWriter) { + jsonNode = OBJECT_MAPPER.valueToTree(target); + failureLogWriter.write(jsonNode); + failureLogWriter.flush(); + } + } + } + } + + private void writeToLogWriter(LogWriter logWriter, JsonNode jsonNode) throws IOException { + logWriter.write(jsonNode); + logWriter.flush(); + } + + private void closeAllLogWriters() { + closeLogWriter(summaryLogWriter); + closeLogWriter(successLogWriter); + closeLogWriter(failureLogWriter); + summaryLogWriter = null; + successLogWriter = null; + failureLogWriter = null; + } +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/SplitByDataChunkImportLogger.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/SplitByDataChunkImportLogger.java new file mode 100644 index 000000000..ff775ea77 --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/SplitByDataChunkImportLogger.java @@ -0,0 +1,187 @@ +package com.scalar.db.dataloader.core.dataimport.log; + +import com.fasterxml.jackson.databind.JsonNode; +import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunkStatus; +import com.scalar.db.dataloader.core.dataimport.log.writer.LogFileType; +import com.scalar.db.dataloader.core.dataimport.log.writer.LogWriter; +import com.scalar.db.dataloader.core.dataimport.log.writer.LogWriterFactory; +import com.scalar.db.dataloader.core.dataimport.task.result.ImportTargetResult; +import com.scalar.db.dataloader.core.dataimport.task.result.ImportTargetResultStatus; +import com.scalar.db.dataloader.core.dataimport.task.result.ImportTaskResult; +import com.scalar.db.dataloader.core.dataimport.transactionbatch.ImportTransactionBatchResult; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SplitByDataChunkImportLogger extends AbstractImportLogger { + + protected static final String SUMMARY_LOG_FILE_NAME_FORMAT = "data_chunk_%s_summary.json"; + protected static final String FAILURE_LOG_FILE_NAME_FORMAT = "data_chunk_%s_failure.json"; + protected static final String SUCCESS_LOG_FILE_NAME_FORMAT = "data_chunk_%s_success.json"; + + private static final Logger LOGGER = LoggerFactory.getLogger(SplitByDataChunkImportLogger.class); + private final Map summaryLogWriters = new HashMap<>(); + private final Map successLogWriters = new HashMap<>(); + private final Map failureLogWriters = new HashMap<>(); + + public SplitByDataChunkImportLogger( + ImportLoggerConfig config, LogWriterFactory logWriterFactory) { + super(config, logWriterFactory); + } + + @Override + public void onTaskComplete(ImportTaskResult taskResult) { + if (!config.isLogSuccessRecords() && !config.isLogRawSourceRecords()) return; + try { + writeImportTaskResultDetailToLogs(taskResult); + } catch (IOException e) { + LOGGER.error("Failed to write success/failure logs"); + } + } + + private void writeImportTaskResultDetailToLogs(ImportTaskResult importTaskResult) + throws IOException { + JsonNode jsonNode; + for (ImportTargetResult target : importTaskResult.getTargets()) { + if (config.isLogSuccessRecords() + && target.getStatus().equals(ImportTargetResultStatus.SAVED)) { + jsonNode = OBJECT_MAPPER.valueToTree(target); + synchronized (successLogWriters) { + LogWriter successLogWriter = + initializeLogWriterIfNeeded(LogFileType.SUCCESS, importTaskResult.getDataChunkId()); + successLogWriter.write(jsonNode); + successLogWriter.flush(); + } + } + if (config.isLogRawSourceRecords() + && !target.getStatus().equals(ImportTargetResultStatus.SAVED)) { + jsonNode = OBJECT_MAPPER.valueToTree(target); + synchronized (failureLogWriters) { + LogWriter failureLogWriter = + initializeLogWriterIfNeeded(LogFileType.FAILURE, importTaskResult.getDataChunkId()); + failureLogWriter.write(jsonNode); + failureLogWriter.flush(); + } + } + } + } + + @Override + public void addOrUpdateDataChunkStatus(ImportDataChunkStatus status) {} + + @Override + public void onDataChunkCompleted(ImportDataChunkStatus dataChunkStatus) { + try { + logDataChunkSummary(dataChunkStatus); + // Close the split log writers per data chunk if they exist for this data chunk id + closeLogWritersForDataChunk(dataChunkStatus.getDataChunkId()); + } catch (IOException e) { + LOGGER.error("Failed to log the data chunk summary", e); + } + } + + @Override + public void onAllDataChunksCompleted() { + closeAllDataChunkLogWriters(); + } + + @Override + protected void logTransactionBatch(ImportTransactionBatchResult batchResult) { + LogFileType logFileType = batchResult.isSuccess() ? LogFileType.SUCCESS : LogFileType.FAILURE; + try (LogWriter logWriter = + initializeLogWriterIfNeeded(logFileType, batchResult.getDataChunkId())) { + JsonNode jsonNode = createFilteredTransactionBatchLogJsonNode(batchResult); + synchronized (logWriter) { + logWriter.write(jsonNode); + logWriter.flush(); + } + } catch (IOException e) { + LOGGER.error("Failed to write a transaction batch record to a split mode log file", e); + } + } + + @Override + protected void logError(String errorMessage, Exception exception) { + LOGGER.error(errorMessage, exception); + } + + private void logDataChunkSummary(ImportDataChunkStatus dataChunkStatus) throws IOException { + try (LogWriter logWriter = + initializeLogWriterIfNeeded(LogFileType.SUMMARY, dataChunkStatus.getDataChunkId())) { + logWriter.write(OBJECT_MAPPER.valueToTree(dataChunkStatus)); + logWriter.flush(); + } + } + + private void closeLogWritersForDataChunk(int dataChunkId) { + closeLogWriter(successLogWriters.remove(dataChunkId)); + closeLogWriter(failureLogWriters.remove(dataChunkId)); + closeLogWriter(summaryLogWriters.remove(dataChunkId)); + } + + private void closeAllDataChunkLogWriters() { + summaryLogWriters.values().forEach(this::closeLogWriter); + successLogWriters.values().forEach(this::closeLogWriter); + failureLogWriters.values().forEach(this::closeLogWriter); + summaryLogWriters.clear(); + successLogWriters.clear(); + failureLogWriters.clear(); + } + + private String getLogFilePath(long batchId, LogFileType logFileType) { + String logfilePath; + switch (logFileType) { + case SUCCESS: + logfilePath = + config.getLogDirectoryPath() + String.format(SUCCESS_LOG_FILE_NAME_FORMAT, batchId); + break; + case FAILURE: + logfilePath = + config.getLogDirectoryPath() + String.format(FAILURE_LOG_FILE_NAME_FORMAT, batchId); + break; + case SUMMARY: + logfilePath = + config.getLogDirectoryPath() + String.format(SUMMARY_LOG_FILE_NAME_FORMAT, batchId); + break; + default: + logfilePath = ""; + } + ; + + return logfilePath; + } + + private LogWriter initializeLogWriterIfNeeded(LogFileType logFileType, int dataChunkId) + throws IOException { + Map logWriters = getLogWriters(logFileType); + if (!logWriters.containsKey(dataChunkId)) { + LogWriter logWriter = createLogWriter(logFileType, dataChunkId); + logWriters.put(dataChunkId, logWriter); + } + return logWriters.get(dataChunkId); + } + + private LogWriter createLogWriter(LogFileType logFileType, int dataChunkId) throws IOException { + String logFilePath = getLogFilePath(dataChunkId, logFileType); + return createLogWriter(logFilePath); + } + + private Map getLogWriters(LogFileType logFileType) { + Map logWriterMap = null; + switch (logFileType) { + case SUCCESS: + logWriterMap = successLogWriters; + break; + case FAILURE: + logWriterMap = failureLogWriters; + break; + case SUMMARY: + logWriterMap = summaryLogWriters; + break; + } + ; + return logWriterMap; + } +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/writer/AwsS3LogWriter.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/writer/AwsS3LogWriter.java new file mode 100644 index 000000000..c11fab0b2 --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/writer/AwsS3LogWriter.java @@ -0,0 +1,29 @@ +package com.scalar.db.dataloader.core.dataimport.log.writer; + +import com.fasterxml.jackson.databind.JsonNode; +import java.io.IOException; +import lombok.AllArgsConstructor; +import software.amazon.awssdk.services.s3.S3AsyncClient; + +@AllArgsConstructor +public class AwsS3LogWriter implements LogWriter { + + private final S3AsyncClient s3AsyncClient; + private final String bucketName; + private final String objectKey; + + @Override + public void write(JsonNode sourceRecord) throws IOException { + // Implementation to write content to cloud storage + } + + @Override + public void flush() throws IOException { + // Implementation to flush content to cloud storage + } + + @Override + public void close() throws IOException { + // Implementation to close the cloud storage connection + } +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/writer/DefaultLogWriterFactory.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/writer/DefaultLogWriterFactory.java new file mode 100644 index 000000000..5ced1e804 --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/writer/DefaultLogWriterFactory.java @@ -0,0 +1,36 @@ +package com.scalar.db.dataloader.core.dataimport.log.writer; + +import com.scalar.db.dataloader.core.dataimport.log.ImportLoggerConfig; +import java.io.IOException; +import lombok.AllArgsConstructor; + +/** A factory class to create log writers. */ +@AllArgsConstructor +public class DefaultLogWriterFactory implements LogWriterFactory { + + private final LogWriterFactoryConfig config; + private final ImportLoggerConfig importLoggerConfig; + + /** + * Creates a log writer based on the configuration. + * + * @param logFilePath the path of the log file + * @return the log writer + */ + @Override + public LogWriter createLogWriter(String logFilePath) throws IOException { + LogWriter logWriter = null; + switch (config.getLogStorageLocation()) { + case LOCAL_FILE_STORAGE: + logWriter = new LocalFileLogWriter(logFilePath, importLoggerConfig); + break; + case AWS_S3: + logWriter = + new AwsS3LogWriter( + config.getS3AsyncClient(), config.getBucketName(), config.getObjectKey()); + break; + } + ; + return logWriter; + } +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/writer/LocalFileLogWriter.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/writer/LocalFileLogWriter.java new file mode 100644 index 000000000..d251a9953 --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/writer/LocalFileLogWriter.java @@ -0,0 +1,62 @@ +package com.scalar.db.dataloader.core.dataimport.log.writer; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import com.scalar.db.dataloader.core.DataLoaderObjectMapper; +import com.scalar.db.dataloader.core.dataimport.log.ImportLoggerConfig; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; + +public class LocalFileLogWriter implements LogWriter { + private final JsonGenerator logWriter; + private final DataLoaderObjectMapper objectMapper; + + /** + * Creates an instance of LocalFileLogWriter with the specified file path and log file type. + * + * @param filePath the file path + * @throws IOException if an I/O error occurs + */ + public LocalFileLogWriter(String filePath, ImportLoggerConfig importLoggerConfig) + throws IOException { + Path path = Path.of(filePath); + this.objectMapper = new DataLoaderObjectMapper(); + this.logWriter = + objectMapper + .getFactory() + .createGenerator( + Files.newBufferedWriter( + path, StandardOpenOption.CREATE, StandardOpenOption.APPEND)); + // Start the JSON array + if (importLoggerConfig.isPrettyPrint()) this.logWriter.useDefaultPrettyPrinter(); + this.logWriter.writeStartArray(); + this.logWriter.flush(); + } + + @Override + public void write(JsonNode sourceRecord) throws IOException { + if (sourceRecord == null) { + return; + } + synchronized (logWriter) { + objectMapper.writeValue(logWriter, sourceRecord); + } + } + + @Override + public void flush() throws IOException { + logWriter.flush(); + } + + @Override + public void close() throws IOException { + if (logWriter.isClosed()) { + return; + } + logWriter.writeEndArray(); + logWriter.flush(); + logWriter.close(); + } +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/writer/LogFileType.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/writer/LogFileType.java new file mode 100644 index 000000000..5483aefc9 --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/writer/LogFileType.java @@ -0,0 +1,8 @@ +package com.scalar.db.dataloader.core.dataimport.log.writer; + +/** The type of the log writer. */ +public enum LogFileType { + SUCCESS, + FAILURE, + SUMMARY +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/writer/LogWriter.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/writer/LogWriter.java new file mode 100644 index 000000000..f10917901 --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/writer/LogWriter.java @@ -0,0 +1,14 @@ +package com.scalar.db.dataloader.core.dataimport.log.writer; + +import com.fasterxml.jackson.databind.JsonNode; +import java.io.IOException; + +public interface LogWriter extends AutoCloseable { + + void write(JsonNode sourceRecord) throws IOException; + + void flush() throws IOException; + + @Override + void close() throws IOException; +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/writer/LogWriterFactory.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/writer/LogWriterFactory.java new file mode 100644 index 000000000..b3c4dfc08 --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/writer/LogWriterFactory.java @@ -0,0 +1,8 @@ +package com.scalar.db.dataloader.core.dataimport.log.writer; + +import java.io.IOException; + +public interface LogWriterFactory { + + LogWriter createLogWriter(String logFilePath) throws IOException; +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/writer/LogWriterFactoryConfig.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/writer/LogWriterFactoryConfig.java new file mode 100644 index 000000000..901d0aae6 --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/writer/LogWriterFactoryConfig.java @@ -0,0 +1,15 @@ +package com.scalar.db.dataloader.core.dataimport.log.writer; + +import com.scalar.db.dataloader.core.dataimport.log.LogStorageLocation; +import lombok.Builder; +import lombok.Value; +import software.amazon.awssdk.services.s3.S3AsyncClient; + +@Builder +@Value +public class LogWriterFactoryConfig { + LogStorageLocation logStorageLocation; + S3AsyncClient s3AsyncClient; + String bucketName; + String objectKey; +} diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/log/SingleFileImportLoggerTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/log/SingleFileImportLoggerTest.java new file mode 100644 index 000000000..0a9fa0d24 --- /dev/null +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/log/SingleFileImportLoggerTest.java @@ -0,0 +1,268 @@ +package com.scalar.db.dataloader.core.dataimport.log; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.scalar.db.dataloader.core.DataLoaderObjectMapper; +import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunkStatus; +import com.scalar.db.dataloader.core.dataimport.log.writer.DefaultLogWriterFactory; +import com.scalar.db.dataloader.core.dataimport.log.writer.LogWriterFactory; +import com.scalar.db.dataloader.core.dataimport.log.writer.LogWriterFactoryConfig; +import com.scalar.db.dataloader.core.dataimport.task.result.ImportTaskResult; +import com.scalar.db.dataloader.core.dataimport.transactionbatch.ImportTransactionBatchResult; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class SingleFileImportLoggerTest { + + private static final Logger LOGGER = LoggerFactory.getLogger(SingleFileImportLoggerTest.class); + private static final DataLoaderObjectMapper OBJECT_MAPPER = new DataLoaderObjectMapper(); + + @TempDir Path tempDir; + + private LogWriterFactory logWriterFactory; + + @BeforeEach + void setUp() { + LogWriterFactoryConfig logWriterFactoryConfig = + LogWriterFactoryConfig.builder() + .logStorageLocation(LogStorageLocation.LOCAL_FILE_STORAGE) + .build(); + ImportLoggerConfig importLoggerConfig = + ImportLoggerConfig.builder() + .prettyPrint(false) + .logSuccessRecords(false) + .logRawSourceRecords(false) + .logDirectoryPath("path") + .build(); + logWriterFactory = new DefaultLogWriterFactory(logWriterFactoryConfig, importLoggerConfig); + } + + @AfterEach + void tearDown() throws IOException { + cleanUpTempDir(); + } + + private void cleanUpTempDir() throws IOException { + try (Stream paths = Files.list(tempDir)) { + paths.forEach(this::deleteFile); + } + } + + private void deleteFile(Path file) { + try { + Files.deleteIfExists(file); + } catch (IOException e) { + LOGGER.error("Failed to delete file: {}", file, e); + } + } + + @Test + void onTransactionBatchCompleted_NoErrors_ShouldWriteToSuccessLogFile() throws IOException { + testTransactionBatchCompleted(true, true); + } + + @Test + void onTransactionBatchCompleted_HasErrors_ShouldWriteToFailureLogFile() throws IOException { + testTransactionBatchCompleted(false, true); + } + + private void testTransactionBatchCompleted(boolean success, boolean logSuccessRecords) + throws IOException { + // Arrange + ImportLoggerConfig config = + ImportLoggerConfig.builder() + .logDirectoryPath(tempDir.toString() + "/") + .logRawSourceRecords(true) + .logSuccessRecords(logSuccessRecords) + .build(); + SingleFileImportLogger importLogger = new SingleFileImportLogger(config, logWriterFactory); + + List batchResults = createBatchResults(1, success); + + // Act + for (ImportTransactionBatchResult batchResult : batchResults) { + importLogger.onTransactionBatchCompleted(batchResult); + importLogger.onDataChunkCompleted( + ImportDataChunkStatus.builder().dataChunkId(batchResult.getDataChunkId()).build()); + } + importLogger.onAllDataChunksCompleted(); + + // Assert + assertTransactionBatchResults(batchResults, success, logSuccessRecords); + } + + private List createBatchResults(int count, boolean success) { + List batchResults = new ArrayList<>(); + + for (int i = 1; i <= count; i++) { + List records = + Collections.singletonList( + ImportTaskResult.builder() + .rowNumber(i) + .rawRecord(OBJECT_MAPPER.createObjectNode()) + .targets(Collections.EMPTY_LIST) + .build()); + ImportTransactionBatchResult result = + ImportTransactionBatchResult.builder() + .dataChunkId(i) + .transactionBatchId(1) + .records(records) + .success(success) + .build(); + batchResults.add(result); + } + + return batchResults; + } + + private void assertTransactionBatchResults( + List batchResults, boolean success, boolean logSuccessRecords) + throws IOException { + DataLoaderObjectMapper objectMapper = new DataLoaderObjectMapper(); + + // Single file log mode + Path logFileName = + tempDir.resolve( + success + ? SingleFileImportLogger.SUCCESS_LOG_FILE_NAME + : SingleFileImportLogger.FAILURE_LOG_FILE_NAME); + if (logSuccessRecords || !success) { + assertTrue(Files.exists(logFileName), "Log file should exist"); + + String logContent = Files.readString(logFileName); + List logEntries = + objectMapper.readValue( + logContent, new TypeReference>() {}); + + assertEquals( + batchResults.size(), + logEntries.size(), + "Number of log entries should match the number of batch results"); + + for (int i = 0; i < batchResults.size(); i++) { + assertTransactionBatchResult(batchResults.get(i), logEntries.get(i)); + } + } else { + assertFalse(Files.exists(logFileName), "Log file should not exist"); + } + } + + private void assertTransactionBatchResult( + ImportTransactionBatchResult expected, ImportTransactionBatchResult actual) { + assertEquals(expected.getDataChunkId(), actual.getDataChunkId(), "Data chunk ID should match"); + assertEquals( + expected.getTransactionBatchId(), + actual.getTransactionBatchId(), + "Transaction batch ID should match"); + assertEquals( + expected.getTransactionId(), actual.getTransactionId(), "Transaction ID should match"); + assertEquals(expected.isSuccess(), actual.isSuccess(), "Success status should match"); + + List expectedRecords = expected.getRecords(); + List actualRecords = actual.getRecords(); + assertEquals(expectedRecords.size(), actualRecords.size(), "Number of records should match"); + for (int j = 0; j < expectedRecords.size(); j++) { + ImportTaskResult expectedRecord = expectedRecords.get(j); + ImportTaskResult actualRecord = actualRecords.get(j); + assertEquals( + expectedRecord.getRowNumber(), actualRecord.getRowNumber(), "Row number should match"); + assertEquals( + expectedRecord.getRawRecord(), actualRecord.getRawRecord(), "Raw record should match"); + assertEquals(expectedRecord.getTargets(), actualRecord.getTargets(), "Targets should match"); + } + } + + @Test + void onDataChunkCompleted_NoErrors_ShouldWriteToSummaryLogFile() throws IOException { + testDataChunkCompleted(false); + } + + @Test + void onDataChunkCompleted_HasErrors_ShouldWriteToSummaryLogFile() throws IOException { + testDataChunkCompleted(true); + } + + private void testDataChunkCompleted(boolean hasErrors) throws IOException { + ImportLoggerConfig config = + ImportLoggerConfig.builder() + .logDirectoryPath(tempDir.toString() + "/") + .logRawSourceRecords(true) + .logSuccessRecords(true) + .build(); + SingleFileImportLogger importLogger = new SingleFileImportLogger(config, logWriterFactory); + + List dataChunkStatuses = + Stream.of(1, 2) + .map(id -> createDataChunkStatus(id, hasErrors)) + .collect(Collectors.toList()); + + dataChunkStatuses.forEach(importLogger::onDataChunkCompleted); + importLogger.onAllDataChunksCompleted(); + + assertDataChunkStatusLog(SingleFileImportLogger.SUMMARY_LOG_FILE_NAME, dataChunkStatuses); + } + + private ImportDataChunkStatus createDataChunkStatus(int dataChunkId, boolean hasErrors) { + return ImportDataChunkStatus.builder() + .dataChunkId(dataChunkId) + .startTime(Instant.now()) + .endTime(Instant.now()) + .totalRecords(100) + .successCount(hasErrors ? 90 : 100) + .failureCount(hasErrors ? 10 : 0) + .batchCount(5) + .totalDurationInMilliSeconds(1000) + .build(); + } + + private void assertDataChunkStatusLog( + String logFilePattern, List dataChunkStatuses) throws IOException { + assertSingleFileLog(tempDir, logFilePattern, dataChunkStatuses); + } + + private void assertSingleFileLog( + Path tempDir, String logFileName, List dataChunkStatuses) + throws IOException { + Path summaryLogFile = tempDir.resolve(logFileName); + assertTrue(Files.exists(summaryLogFile)); + + String logContent = Files.readString(summaryLogFile); + DataLoaderObjectMapper objectMapper = new DataLoaderObjectMapper(); + List logEntries = + objectMapper.readValue(logContent, new TypeReference>() {}); + + assertEquals(dataChunkStatuses.size(), logEntries.size()); + for (int i = 0; i < dataChunkStatuses.size(); i++) { + assertDataChunkStatusEquals(dataChunkStatuses.get(i), logEntries.get(i)); + } + } + + private void assertDataChunkStatusEquals( + ImportDataChunkStatus expected, ImportDataChunkStatus actual) { + assertEquals(expected.getDataChunkId(), actual.getDataChunkId()); + assertEquals(expected.getStartTime(), actual.getStartTime()); + assertEquals(expected.getEndTime(), actual.getEndTime()); + assertEquals(expected.getTotalRecords(), actual.getTotalRecords()); + assertEquals(expected.getSuccessCount(), actual.getSuccessCount()); + assertEquals(expected.getFailureCount(), actual.getFailureCount()); + assertEquals(expected.getBatchCount(), actual.getBatchCount()); + assertEquals( + expected.getTotalDurationInMilliSeconds(), actual.getTotalDurationInMilliSeconds()); + } +} diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/log/SplitByDataChunkImportLoggerTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/log/SplitByDataChunkImportLoggerTest.java new file mode 100644 index 000000000..70e293793 --- /dev/null +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/log/SplitByDataChunkImportLoggerTest.java @@ -0,0 +1,239 @@ +package com.scalar.db.dataloader.core.dataimport.log; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.scalar.db.dataloader.core.DataLoaderObjectMapper; +import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunkStatus; +import com.scalar.db.dataloader.core.dataimport.log.writer.DefaultLogWriterFactory; +import com.scalar.db.dataloader.core.dataimport.log.writer.LogWriterFactory; +import com.scalar.db.dataloader.core.dataimport.log.writer.LogWriterFactoryConfig; +import com.scalar.db.dataloader.core.dataimport.task.result.ImportTaskResult; +import com.scalar.db.dataloader.core.dataimport.transactionbatch.ImportTransactionBatchResult; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +class SplitByDataChunkImportLoggerTest { + + private static final DataLoaderObjectMapper OBJECT_MAPPER = new DataLoaderObjectMapper(); + + @TempDir Path tempDir; + + private LogWriterFactory logWriterFactory; + + @BeforeEach + void setUp() { + LogWriterFactoryConfig logWriterFactoryConfig = + LogWriterFactoryConfig.builder() + .logStorageLocation(LogStorageLocation.LOCAL_FILE_STORAGE) + .build(); + ImportLoggerConfig importLoggerConfig = + ImportLoggerConfig.builder() + .prettyPrint(false) + .logSuccessRecords(false) + .logRawSourceRecords(false) + .logDirectoryPath("path") + .build(); + logWriterFactory = new DefaultLogWriterFactory(logWriterFactoryConfig, importLoggerConfig); + } + + @Test + void onTransactionBatchCompleted_NoErrors_ShouldWriteToDataChunkSuccessFiles() + throws IOException { + testTransactionBatchCompleted(true, true); + } + + @Test + void onTransactionBatchCompleted_HasErrors_ShouldWriteToDataChunkFailureFiles() + throws IOException { + testTransactionBatchCompleted(false, true); + } + + @Test + void onTransactionBatchCompleted_NoErrorsAndNoSuccessFileLogging_ShouldNotWriteToSuccessFiles() + throws IOException { + testTransactionBatchCompleted(true, false); + } + + private void testTransactionBatchCompleted(boolean success, boolean logSuccessRecords) + throws IOException { + // Arrange + ImportLoggerConfig config = + ImportLoggerConfig.builder() + .logDirectoryPath(tempDir.toString() + "/") + .logRawSourceRecords(true) + .logSuccessRecords(logSuccessRecords) + .build(); + SplitByDataChunkImportLogger importLogger = + new SplitByDataChunkImportLogger(config, logWriterFactory); + + List batchResults = new ArrayList<>(); + + for (int i = 1; i <= 3; i++) { + List records = + Collections.singletonList( + ImportTaskResult.builder() + .rowNumber(i) + .targets(Collections.EMPTY_LIST) + .rawRecord(OBJECT_MAPPER.createObjectNode()) + .build()); + ImportTransactionBatchResult result = + ImportTransactionBatchResult.builder() + .dataChunkId(i) + .transactionBatchId(1) + .records(records) + .success(success) + .build(); + batchResults.add(result); + } + + // Act + for (ImportTransactionBatchResult batchResult : batchResults) { + importLogger.onTransactionBatchCompleted(batchResult); + importLogger.onDataChunkCompleted( + ImportDataChunkStatus.builder().dataChunkId(batchResult.getDataChunkId()).build()); + } + importLogger.onAllDataChunksCompleted(); + + // Assert + for (int i = 0; i < batchResults.size(); i++) { + ImportTransactionBatchResult batchResult = batchResults.get(i); + String logFileNameFormat = + success + ? SplitByDataChunkImportLogger.SUCCESS_LOG_FILE_NAME_FORMAT + : SplitByDataChunkImportLogger.FAILURE_LOG_FILE_NAME_FORMAT; + Path dataChunkLogFileName = tempDir.resolve(String.format(logFileNameFormat, i + 1)); + + if (success && logSuccessRecords) { + assertTrue(Files.exists(dataChunkLogFileName), "Data chunk success log file should exist"); + assertTransactionBatchResult(batchResult, dataChunkLogFileName); + } else if (!success) { + assertTrue(Files.exists(dataChunkLogFileName), "Data chunk failure log file should exist"); + assertTransactionBatchResult(batchResult, dataChunkLogFileName); + } else { + assertFalse( + Files.exists(dataChunkLogFileName), "Data chunk success log file should not exist"); + } + } + } + + private void assertTransactionBatchResult( + ImportTransactionBatchResult expected, Path dataChunkLogFileName) throws IOException { + String logContent = Files.readString(dataChunkLogFileName); + DataLoaderObjectMapper objectMapper = new DataLoaderObjectMapper(); + List logEntries = + objectMapper.readValue( + logContent, new TypeReference>() {}); + ImportTransactionBatchResult actual = logEntries.get(0); + + assertEquals(expected.getDataChunkId(), actual.getDataChunkId(), "Data chunk ID should match"); + assertEquals( + expected.getTransactionBatchId(), + actual.getTransactionBatchId(), + "Transaction batch ID should match"); + assertEquals( + expected.getTransactionId(), actual.getTransactionId(), "Transaction ID should match"); + assertEquals(expected.isSuccess(), actual.isSuccess(), "Success status should match"); + + List expectedRecords = expected.getRecords(); + List actualRecords = actual.getRecords(); + assertEquals(expectedRecords.size(), actualRecords.size(), "Number of records should match"); + for (int j = 0; j < expectedRecords.size(); j++) { + ImportTaskResult expectedRecord = expectedRecords.get(j); + ImportTaskResult actualRecord = actualRecords.get(j); + assertEquals( + expectedRecord.getRowNumber(), actualRecord.getRowNumber(), "Row number should match"); + assertEquals( + expectedRecord.getRawRecord(), actualRecord.getRawRecord(), "Raw record should match"); + assertEquals(expectedRecord.getTargets(), actualRecord.getTargets(), "Targets should match"); + } + } + + @Test + void onDataChunkCompleted_NoErrors_ShouldWriteToSummaryLogFile() throws IOException { + testDataChunkCompleted( + String.format(SplitByDataChunkImportLogger.SUMMARY_LOG_FILE_NAME_FORMAT, "%d"), false); + } + + @Test + void onDataChunkCompleted_HasErrors_ShouldWriteToSummaryLogFile() throws IOException { + testDataChunkCompleted( + String.format(SplitByDataChunkImportLogger.SUMMARY_LOG_FILE_NAME_FORMAT, "%d"), true); + } + + private void testDataChunkCompleted(String logFilePattern, boolean hasErrors) throws IOException { + ImportLoggerConfig config = + ImportLoggerConfig.builder() + .logDirectoryPath(tempDir.toString() + "/") + .logRawSourceRecords(true) + .logSuccessRecords(true) + .build(); + SplitByDataChunkImportLogger importLogger = + new SplitByDataChunkImportLogger(config, logWriterFactory); + + List dataChunkStatuses = + IntStream.rangeClosed(1, 2) + .mapToObj(id -> createDataChunkStatus(id, hasErrors)) + .collect(Collectors.toList()); + + dataChunkStatuses.forEach(importLogger::onDataChunkCompleted); + importLogger.onAllDataChunksCompleted(); + + assertDataChunkStatusLog(logFilePattern, dataChunkStatuses); + } + + private ImportDataChunkStatus createDataChunkStatus(int dataChunkId, boolean hasErrors) { + return ImportDataChunkStatus.builder() + .dataChunkId(dataChunkId) + .startTime(Instant.now()) + .endTime(Instant.now()) + .totalRecords(100) + .successCount(hasErrors ? 90 : 100) + .failureCount(hasErrors ? 10 : 0) + .batchCount(5) + .totalDurationInMilliSeconds(1000) + .build(); + } + + private void assertDataChunkStatusLog( + String logFilePattern, List dataChunkStatuses) throws IOException { + for (ImportDataChunkStatus dataChunkStatus : dataChunkStatuses) { + String logFileName = String.format(logFilePattern, dataChunkStatus.getDataChunkId()); + Path dataChunkLogFile = tempDir.resolve(logFileName); + assertTrue(Files.exists(dataChunkLogFile), "Data chunk summary log file should exist"); + + String logContent = Files.readString(dataChunkLogFile); + DataLoaderObjectMapper objectMapper = new DataLoaderObjectMapper(); + List logEntries = + objectMapper.readValue(logContent, new TypeReference>() {}); + + assertEquals(1, logEntries.size()); + assertDataChunkStatusEquals(dataChunkStatus, logEntries.get(0)); + } + } + + private void assertDataChunkStatusEquals( + ImportDataChunkStatus expected, ImportDataChunkStatus actual) { + assertEquals(expected.getDataChunkId(), actual.getDataChunkId()); + assertEquals(expected.getStartTime(), actual.getStartTime()); + assertEquals(expected.getEndTime(), actual.getEndTime()); + assertEquals(expected.getTotalRecords(), actual.getTotalRecords()); + assertEquals(expected.getSuccessCount(), actual.getSuccessCount()); + assertEquals(expected.getFailureCount(), actual.getFailureCount()); + assertEquals(expected.getBatchCount(), actual.getBatchCount()); + assertEquals( + expected.getTotalDurationInMilliSeconds(), actual.getTotalDurationInMilliSeconds()); + } +} diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/log/writer/DefaultLogWriterFactoryTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/log/writer/DefaultLogWriterFactoryTest.java new file mode 100644 index 000000000..e9102bca6 --- /dev/null +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/log/writer/DefaultLogWriterFactoryTest.java @@ -0,0 +1,66 @@ +package com.scalar.db.dataloader.core.dataimport.log.writer; + +import com.scalar.db.dataloader.core.dataimport.log.ImportLoggerConfig; +import com.scalar.db.dataloader.core.dataimport.log.LogStorageLocation; +import java.io.File; +import java.io.IOException; +import java.nio.file.Paths; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; +import software.amazon.awssdk.services.s3.S3AsyncClient; + +class DefaultLogWriterFactoryTest { + + String filePath = Paths.get("").toAbsolutePath() + "/sample.log"; + DefaultLogWriterFactory defaultLogWriterFactory; + + @AfterEach + void removeFileIfCreated() { + File file = new File(filePath); + if (file.exists()) { + file.deleteOnExit(); + } + } + + @Test + void createLogWriter_withValidLocalLogFilePath_shouldReturnLocalFileLogWriterObject() + throws IOException { + defaultLogWriterFactory = + new DefaultLogWriterFactory( + LogWriterFactoryConfig.builder() + .logStorageLocation(LogStorageLocation.LOCAL_FILE_STORAGE) + .build(), + ImportLoggerConfig.builder() + .prettyPrint(false) + .logSuccessRecords(false) + .logRawSourceRecords(false) + .logDirectoryPath("path") + .build()); + LogWriter logWriter = defaultLogWriterFactory.createLogWriter(filePath); + Assertions.assertEquals(LocalFileLogWriter.class, logWriter.getClass()); + logWriter.close(); + } + + @Test + void createLogWriter_withValidFilePath_shouldReturnLogWriterObject() throws IOException { + defaultLogWriterFactory = + new DefaultLogWriterFactory( + LogWriterFactoryConfig.builder() + .logStorageLocation(LogStorageLocation.AWS_S3) + .bucketName("bucket") + .objectKey("ObjectKay") + .s3AsyncClient(Mockito.mock(S3AsyncClient.class)) + .build(), + ImportLoggerConfig.builder() + .prettyPrint(false) + .logSuccessRecords(false) + .logRawSourceRecords(false) + .logDirectoryPath("path") + .build()); + LogWriter logWriter = defaultLogWriterFactory.createLogWriter(filePath); + Assertions.assertEquals(AwsS3LogWriter.class, logWriter.getClass()); + logWriter.close(); + } +} From 10409a82f78f1ebbcbb766f2ab043e30528cbb5c Mon Sep 17 00:00:00 2001 From: Jishnu J Date: Tue, 14 Jan 2025 16:57:08 +0530 Subject: [PATCH 2/3] Change code to be compatible with java 8 --- .../core/dataimport/log/writer/LocalFileLogWriter.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/writer/LocalFileLogWriter.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/writer/LocalFileLogWriter.java index d251a9953..b29395e8e 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/writer/LocalFileLogWriter.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/writer/LocalFileLogWriter.java @@ -7,6 +7,7 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; +import java.nio.file.Paths; import java.nio.file.StandardOpenOption; public class LocalFileLogWriter implements LogWriter { @@ -21,7 +22,7 @@ public class LocalFileLogWriter implements LogWriter { */ public LocalFileLogWriter(String filePath, ImportLoggerConfig importLoggerConfig) throws IOException { - Path path = Path.of(filePath); + Path path = Paths.get(filePath); this.objectMapper = new DataLoaderObjectMapper(); this.logWriter = objectMapper From 0180a37364a191c91105f9ff0f7621138f3d7a0b Mon Sep 17 00:00:00 2001 From: Jishnu J Date: Tue, 14 Jan 2025 17:15:41 +0530 Subject: [PATCH 3/3] Change code to be compatible with java 8 -2 --- .../core/dataimport/log/SplitByDataChunkImportLogger.java | 3 --- .../core/dataimport/log/SingleFileImportLoggerTest.java | 6 ++++-- .../dataimport/log/SplitByDataChunkImportLoggerTest.java | 8 ++++++-- 3 files changed, 10 insertions(+), 7 deletions(-) diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/SplitByDataChunkImportLogger.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/SplitByDataChunkImportLogger.java index ff775ea77..bec306ef9 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/SplitByDataChunkImportLogger.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/SplitByDataChunkImportLogger.java @@ -148,8 +148,6 @@ private String getLogFilePath(long batchId, LogFileType logFileType) { default: logfilePath = ""; } - ; - return logfilePath; } @@ -181,7 +179,6 @@ private Map getLogWriters(LogFileType logFileType) { logWriterMap = summaryLogWriters; break; } - ; return logWriterMap; } } diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/log/SingleFileImportLoggerTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/log/SingleFileImportLoggerTest.java index 0a9fa0d24..98e58109e 100644 --- a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/log/SingleFileImportLoggerTest.java +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/log/SingleFileImportLoggerTest.java @@ -13,6 +13,7 @@ import com.scalar.db.dataloader.core.dataimport.task.result.ImportTaskResult; import com.scalar.db.dataloader.core.dataimport.transactionbatch.ImportTransactionBatchResult; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.time.Instant; @@ -145,7 +146,8 @@ private void assertTransactionBatchResults( if (logSuccessRecords || !success) { assertTrue(Files.exists(logFileName), "Log file should exist"); - String logContent = Files.readString(logFileName); + String logContent = new String(Files.readAllBytes(logFileName), StandardCharsets.UTF_8); + List logEntries = objectMapper.readValue( logContent, new TypeReference>() {}); @@ -242,7 +244,7 @@ private void assertSingleFileLog( Path summaryLogFile = tempDir.resolve(logFileName); assertTrue(Files.exists(summaryLogFile)); - String logContent = Files.readString(summaryLogFile); + String logContent = new String(Files.readAllBytes(summaryLogFile), StandardCharsets.UTF_8); DataLoaderObjectMapper objectMapper = new DataLoaderObjectMapper(); List logEntries = objectMapper.readValue(logContent, new TypeReference>() {}); diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/log/SplitByDataChunkImportLoggerTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/log/SplitByDataChunkImportLoggerTest.java index 70e293793..800ae4e97 100644 --- a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/log/SplitByDataChunkImportLoggerTest.java +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/log/SplitByDataChunkImportLoggerTest.java @@ -13,6 +13,7 @@ import com.scalar.db.dataloader.core.dataimport.task.result.ImportTaskResult; import com.scalar.db.dataloader.core.dataimport.transactionbatch.ImportTransactionBatchResult; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.time.Instant; @@ -131,7 +132,9 @@ private void testTransactionBatchCompleted(boolean success, boolean logSuccessRe private void assertTransactionBatchResult( ImportTransactionBatchResult expected, Path dataChunkLogFileName) throws IOException { - String logContent = Files.readString(dataChunkLogFileName); + // String logContent = Files.readString(dataChunkLogFileName); + String logContent = + new String(Files.readAllBytes(dataChunkLogFileName), StandardCharsets.UTF_8); DataLoaderObjectMapper objectMapper = new DataLoaderObjectMapper(); List logEntries = objectMapper.readValue( @@ -214,7 +217,8 @@ private void assertDataChunkStatusLog( Path dataChunkLogFile = tempDir.resolve(logFileName); assertTrue(Files.exists(dataChunkLogFile), "Data chunk summary log file should exist"); - String logContent = Files.readString(dataChunkLogFile); + // String logContent = Files.readString(dataChunkLogFile); + String logContent = new String(Files.readAllBytes(dataChunkLogFile), StandardCharsets.UTF_8); DataLoaderObjectMapper objectMapper = new DataLoaderObjectMapper(); List logEntries = objectMapper.readValue(logContent, new TypeReference>() {});