Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support to disable connection pool. #992

9 changes: 9 additions & 0 deletions .github/workflows/pull-request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,15 @@ jobs:
with:
SINK_CONNECTOR_IMAGE: altinityinfra/clickhouse-sink-connector:${{ github.event.number }}-${{ github.sha }}-lt

testflows-lightweight-hikari-pool:
needs: [build-kafka-lightweight]
uses: ./.github/workflows/testflows-sink-connector-lightweight.yml
secrets: inherit
with:
SINK_CONNECTOR_IMAGE: altinityinfra/clickhouse-sink-connector:${{ github.event.number }}-${{ github.sha }}-lt
extra_args: --hikari-pool
artifact_suffix: hikari-pool

java-tests-kafka:
needs: [build-kafka-lightweight]
uses: ./.github/workflows/sink-connector-kafka-tests.yml
Expand Down
25 changes: 20 additions & 5 deletions .github/workflows/testflows-sink-connector-lightweight.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,16 @@ on:
description: "Testflows output style."
type: string
default: new-fails
extra_args:
description: "Extra arguments for the tests."
required: false
type: string
default: ""
artifact_suffix:
description: "Artifact sufix"
required: false
type: string
default: ""
secrets:
DOCKERHUB_USERNAME:
required: false
Expand All @@ -35,10 +45,15 @@ on:
description: "Package either 'docker://' or 'https://'. Example: 'https://s3.amazonaws.com/clickhouse-builds/23.3/.../package_release/clickhouse-common-static_23.3.1.64_amd64.deb', or 'docker://altinity/clickhouse-server:23.8.8'"
type: string
default: docker://clickhouse/clickhouse-server:23.8
extra_args:
suite:
description: "Specific Suite To Run (Default * to run everything)."
required: false
type: string
extra_args:
description: "Extra arguments for the tests."
required: false
type: string
default: ""
custom_run_name:
description: 'Custom run name (optional)'
required: false
Expand Down Expand Up @@ -115,7 +130,7 @@ jobs:

- name: Run testflows tests
working-directory: sink-connector-lightweight/tests/integration
run: python3 -u regression.py --only "/mysql to clickhouse replication/auto table creation/${{ inputs.extra_args != '' && inputs.extra_args || '*' }}" --clickhouse-binary-path="${{inputs.package}}" --test-to-end --output ${{ inputs.output_format }} --collect-service-logs --attr project="${GITHUB_REPOSITORY}" project.id="$GITHUB_RUN_NUMBER" user.name="$GITHUB_ACTOR" github_actions_run="$GITHUB_SERVER_URL/$GITHUB_REPOSITORY/actions/runs/$GITHUB_RUN_ID" sink_version="registry.gitlab.com/altinity-public/container-images/clickhouse_debezium_embedded:latest" s3_url="https://altinity-test-reports.s3.amazonaws.com/index.html#altinity-sink-connector/testflows/${{ steps.date.outputs.date }}_${{github.run.number}}/" --log logs/raw.log
run: python3 -u regression.py --only "/mysql to clickhouse replication/auto table creation/${{ inputs.suite != '' && inputs.suite || '*' }}" --clickhouse-binary-path="${{inputs.package}}" --test-to-end ${{ inputs.extra_args != '' && inputs.extra_args || '' }} --output ${{ inputs.output_format }} --collect-service-logs --attr project="${GITHUB_REPOSITORY}" project.id="$GITHUB_RUN_NUMBER" user.name="$GITHUB_ACTOR" github_actions_run="$GITHUB_SERVER_URL/$GITHUB_REPOSITORY/actions/runs/$GITHUB_RUN_ID" sink_version="registry.gitlab.com/altinity-public/container-images/clickhouse_debezium_embedded:latest" s3_url="https://altinity-test-reports.s3.amazonaws.com/index.html#altinity-sink-connector/testflows/${{ steps.date.outputs.date }}_${{github.run.number}}/" --log logs/raw.log

- name: Create tfs results report
if: always()
Expand All @@ -139,7 +154,7 @@ jobs:
- uses: actions/upload-artifact@v4
if: always()
with:
name: testflows-sink-connector-lightweight-artefacts
name: testflows-sink-connector-lightweight-artefacts${{ inputs.artifact_suffix != '' && '-' || '' }}${{ inputs.artifact_suffix }}
path: |
sink-connector-lightweight/tests/integration/logs/*.log
sink-connector-lightweight/tests/integration/env/auto/configs/*.yml
Expand Down Expand Up @@ -193,7 +208,7 @@ jobs:

- name: Run testflows tests
working-directory: sink-connector-lightweight/tests/integration
run: python3 -u regression.py --only "/mysql to clickhouse replication/auto replicated table creation/${{ inputs.extra_args != '' && inputs.extra_args || '*' }}" --clickhouse-binary-path="${{inputs.package}}" --test-to-end --output ${{ inputs.output_format }} --collect-service-logs --attr project="${GITHUB_REPOSITORY}" project.id="$GITHUB_RUN_NUMBER" user.name="$GITHUB_ACTOR" github_actions_run="$GITHUB_SERVER_URL/$GITHUB_REPOSITORY/actions/runs/$GITHUB_RUN_ID" sink_version="registry.gitlab.com/altinity-public/container-images/clickhouse_debezium_embedded:latest" s3_url="https://altinity-test-reports.s3.amazonaws.com/index.html#altinity-sink-connector/testflows/${{ steps.date.outputs.date }}_${{github.run.number}}/" --log logs/raw.log
run: python3 -u regression.py --only "/mysql to clickhouse replication/auto replicated table creation/${{ inputs.suite != '' && inputs.suite || '*' }}" --clickhouse-binary-path="${{inputs.package}}" --test-to-end ${{ inputs.extra_args != '' && inputs.extra_args || '' }} --output ${{ inputs.output_format }} --collect-service-logs --attr project="${GITHUB_REPOSITORY}" project.id="$GITHUB_RUN_NUMBER" user.name="$GITHUB_ACTOR" github_actions_run="$GITHUB_SERVER_URL/$GITHUB_REPOSITORY/actions/runs/$GITHUB_RUN_ID" sink_version="registry.gitlab.com/altinity-public/container-images/clickhouse_debezium_embedded:latest" s3_url="https://altinity-test-reports.s3.amazonaws.com/index.html#altinity-sink-connector/testflows/${{ steps.date.outputs.date }}_${{github.run.number}}/" --log logs/raw.log

- name: Create tfs results report
if: always()
Expand All @@ -217,7 +232,7 @@ jobs:
- uses: actions/upload-artifact@v4
if: always()
with:
name: testflows-sink-connector-lightweight-replicated-artefacts
name: testflows-sink-connector-lightweight-replicated-artefacts${{ inputs.artifact_suffix != '' && '-' || '' }}${{ inputs.artifact_suffix }}
path: |
sink-connector-lightweight/tests/integration/logs/*.log
sink-connector-lightweight/tests/integration/env/auto_replicated/configs/*.yml
Expand Down
3 changes: 2 additions & 1 deletion sink-connector-lightweight/docker/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,8 @@ clickhouse.jdbc.params: "keepalive.timeout=3,max_buffer_size=1000000,socket_time
#Metrics (Prometheus target), required for Grafana Dashboard
metrics.enable: "true"

connection.pool.max.size: 1000
#connection.pool.disable: "true"
#connection.pool.max.size: 500

# Skip schema history capturing, use the following configuration
# to reduce slow startup when replicating dbs with large number of tables
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,12 @@ def argparser(parser):
default=False,
help="enable stress testing (might take a long time)",
)

parser.add_argument(
"--hikari-pool",
action="store_true",
default=False,
help="enable hikari connection pool. Default: False",
)
parser.add_argument(
"--thread-fuzzer",
action="store_true",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,5 +48,4 @@
"clickhouse.datetime.timezone": "UTC",
"auto.create.tables": "true",
"ddl.retry": "true",
# "connection.pool.max.size": "2300"
}
3 changes: 3 additions & 0 deletions sink-connector-lightweight/tests/integration/regression.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
ffails = {}



@TestModule
@Name("mysql to clickhouse replication")
@FFails(ffails)
Expand All @@ -27,6 +28,7 @@ def regression(
local,
clickhouse_binary_path,
clickhouse_version,
hikari_pool,
stress=None,
thread_fuzzer=None,
collect_service_logs=None,
Expand All @@ -38,6 +40,7 @@ def regression(
"clickhouse_version": clickhouse_version,
"stress": stress,
"collect_service_logs": collect_service_logs,
"hikari_pool": hikari_pool,
}

self.context.stress = stress
Expand Down
18 changes: 13 additions & 5 deletions sink-connector-lightweight/tests/integration/regression_auto.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ def regression(
local,
clickhouse_binary_path,
clickhouse_version,
hikari_pool,
env="env/auto",
stress=None,
thread_fuzzer=None,
Expand All @@ -173,6 +174,13 @@ def regression(
"zookeeper": ("zookeeper",),
}

if not hikari_pool:
default_config["connection.pool.disable"] = "true"
default_config["clickhouse.jdbc.params"] = "max_open_connections=100,keepalive.timeout=3,max_buffer_size=1000000,socket_timeout=30000,connection_timeout=30000"
else:
default_config["connection.pool.disable"] = "false"


self.context.nodes = nodes
self.context.clickhouse_version = clickhouse_version
self.context.config = SinkConfig()
Expand Down Expand Up @@ -274,11 +282,11 @@ def regression(
parallel=True,
executor=executor,
)
# Feature(
# run=load("tests.snowflake_id", "module"),
# parallel=True,
# executor=executor,
# )
Feature(
run=load("tests.snowflake_id", "module"),
parallel=True,
executor=executor,
)
Feature(
run=load("tests.table_names", "module"),
parallel=True,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ def regression(
local,
clickhouse_binary_path,
clickhouse_version,
hikari_pool,
env="env/auto_replicated",
stress=None,
thread_fuzzer=None,
Expand All @@ -171,6 +172,12 @@ def regression(
"zookeeper": ("zookeeper",),
}

if not hikari_pool:
default_config["connection.pool.disable"] = "true"
default_config["clickhouse.jdbc.params"] = "max_open_connections=100,keepalive.timeout=3,max_buffer_size=1000000,socket_timeout=30000,connection_timeout=30000"
else:
default_config["connection.pool.disable"] = "false"

self.context.nodes = nodes
self.context.clickhouse_version = clickhouse_version
self.context.config = SinkConfig()
Expand Down
2 changes: 1 addition & 1 deletion sink-connector/deploy/docker/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ services:
clickhouse:
# clickhouse-client --host=127.0.0.1 --port=9000 --user=root --password=root --database=test
container_name: clickhouse
image: clickhouse/clickhouse-server:latest
image: clickhouse/clickhouse-server:23.8.5
restart: "no"
depends_on:
zookeeper:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,26 @@ static ConfigDef newConfigDef() {
6,
ConfigDef.Width.NONE,
ClickHouseSinkConnectorConfigVariables.CONNECTION_POOL_MIN_IDLE.toString())
.define(
ClickHouseSinkConnectorConfigVariables.CONNECTION_POOL_MAX_LIFETIME.toString(),
Type.LONG,
300000,
Importance.HIGH,
"The maximum lifetime of the connection pool",
CONFIG_GROUP_CONNECTOR_CONFIG,
6,
ConfigDef.Width.NONE,
ClickHouseSinkConnectorConfigVariables.CONNECTION_POOL_MAX_LIFETIME.toString())
.define(
ClickHouseSinkConnectorConfigVariables.CONNECTION_POOL_DISABLE.toString(),
Type.BOOLEAN,
false,
Importance.HIGH,
"If set to true, the connection pool is disabled",
CONFIG_GROUP_CONNECTOR_CONFIG,
6,
ConfigDef.Width.NONE,
ClickHouseSinkConnectorConfigVariables.CONNECTION_POOL_DISABLE.toString())
.define(
ClickHouseSinkConnectorConfigVariables.OFFSET_STORAGE_TABLE_NAME.toString(),
Type.STRING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ public enum ClickHouseSinkConnectorConfigVariables {
CONNECTION_POOL_MAX_SIZE("connection.pool.max.size"),
CONNECTION_POOL_TIMEOUT("connection.pool.timeout"),
CONNECTION_POOL_MIN_IDLE("connection.pool.min.idle"),
CONNECTION_POOL_MAX_LIFETIME("connection.pool.max.lifetime"),

CONNECTION_POOL_DISABLE("connection.pool.disable"),

OFFSET_STORAGE_TABLE_NAME("offset.storage.jdbc.offset.table.name"),

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ protected void createDestinationDatabase(String databaseName) {
}
// if maxRetries exceeded, throw runtime exception.
if(createDatabaseFailed == false) {
throw new RuntimeException("Error creating Database: " + databaseName);
throw new RuntimeException("Error creating Database: " + databaseName, e);
}
}
}
Expand Down Expand Up @@ -142,8 +142,11 @@ public static Connection createConnection(String url, String clientName, String
Properties properties = new Properties();
properties.setProperty("client_name", clientName);
properties.setProperty("custom_settings", "allow_experimental_object_type=1,insert_allow_materialized_columns=1");
properties.setProperty("http_connection_provider", "HTTP_URL_CONNECTION");
//properties.setProperty("max_open_connections", "100");
boolean connectionPoolDisable = config.getBoolean(ClickHouseSinkConnectorConfigVariables.CONNECTION_POOL_DISABLE.toString());
// Set the http connection provider to HTTP_URL_CONNECTION if connection pool is enabled.
if(!connectionPoolDisable) {
properties.setProperty("http_connection_provider", "HTTP_URL_CONNECTION");
}
if(!jdbcParams.isEmpty()) {
log.info("**** JDBC PARAMS from configuration:" + jdbcParams);
Properties userProps = splitJdbcProperties(jdbcParams);
Expand All @@ -154,11 +157,15 @@ public static Connection createConnection(String url, String clientName, String

SinkConnectorDataSource dataSource = new SinkConnectorDataSource(url, properties, null);
// Get connection from the pool.
HikariDataSource hikariDbSource = HikariDbSource.getInstance(dataSource, databaseName, config);
// Create a new ClickHouseConnection object with the connection from the pool.
// Convert Connection to ClickHouseConnection.

conn = hikariDbSource.getConnection();
if(connectionPoolDisable) {
log.info("Connection pool is disabled, creating a new connection");
conn = dataSource.getConnection();
} else {
HikariDataSource hikariDbSource = HikariDbSource.getInstance(dataSource, databaseName, config);
// Create a new ClickHouseConnection object with the connection from the pool.
// Convert Connection to ClickHouseConnection.
conn = hikariDbSource.getConnection();
}
} catch (Exception e) {
log.error("Error creating ClickHouse connection" + e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ private static HikariDataSource createConnectionPool(SinkConnectorDataSource chD
int maxPoolSize = config.getInt(ClickHouseSinkConnectorConfigVariables.CONNECTION_POOL_MAX_SIZE.toString());
long poolConnectionTimeout = config.getLong(ClickHouseSinkConnectorConfigVariables.CONNECTION_POOL_TIMEOUT.toString());
int minIdle = config.getInt(ClickHouseSinkConnectorConfigVariables.CONNECTION_POOL_MIN_IDLE.toString());

long maxLifetime = config.getLong(ClickHouseSinkConnectorConfigVariables.CONNECTION_POOL_MAX_LIFETIME.toString());

HikariConfig poolConfig = new HikariConfig();
poolConfig.setPoolName("clickhouse" + "-" + databaseName);
String jdbcUrl = String.format("jdbc:ch:{hostname}:{port}/%s?insert_quorum=auto&server_time_zone&http_connection_provider=HTTP_URL_CONNECTION&server_version=22.13.1.24495", databaseName);
Expand All @@ -76,7 +77,7 @@ private static HikariDataSource createConnectionPool(SinkConnectorDataSource chD
poolConfig.setMaximumPoolSize(maxPoolSize);
//poolConfig.setMinimumIdle(minIdle);
//poolConfig.setIdleTimeout(2_000L);
poolConfig.setMaxLifetime(300_000L);
poolConfig.setMaxLifetime(maxLifetime);
poolConfig.setDataSource(chDataSource);

HikariDataSource dataSource = new HikariDataSource(poolConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.ClickHouseContainer;
import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
import org.testcontainers.containers.wait.strategy.WaitStrategy;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;

Expand All @@ -34,7 +36,8 @@ public class ClickHouseAutoCreateTableTest {

@Container
private ClickHouseContainer clickHouseContainer = new ClickHouseContainer("clickhouse/clickhouse-server:latest")
.withInitScript("./init_clickhouse.sql");
.withInitScript("./init_clickhouse.sql").waitingFor(new HttpWaitStrategy().forPort(8123));

@BeforeAll
static void initialize() {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
package com.altinity.clickhouse.sink.connector.db.operations;

import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig;
import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfigVariables;
import com.altinity.clickhouse.sink.connector.db.BaseDbWriter;
import com.clickhouse.jdbc.ClickHouseConnection;
import com.altinity.clickhouse.sink.connector.db.DbWriter;
import com.altinity.clickhouse.sink.connector.db.operations.ClickHouseCreateDatabase;

import org.junit.After;
import org.junit.Assert;
import org.junit.jupiter.api.AfterAll;
import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.junit.jupiter.Container;
import org.junit.jupiter.api.BeforeAll;
Expand All @@ -31,7 +35,8 @@ public class ClickHouseCreateDatabaseTest {
static String dbName;

@Container
private static ClickHouseContainer clickHouseContainer = new ClickHouseContainer("clickhouse/clickhouse-server:latest");
private static ClickHouseContainer clickHouseContainer = new ClickHouseContainer("clickhouse/clickhouse-server:latest")
.waitingFor(new HttpWaitStrategy().forPort(8123));
@BeforeAll
static void initialize() {

Expand All @@ -42,7 +47,9 @@ static void initialize() {
String systemDb = "system";
dbName = "test_create_db";

ClickHouseSinkConnectorConfig config= new ClickHouseSinkConnectorConfig(new HashMap<>());
HashMap<String, String> options = new HashMap<>();
options.put(ClickHouseSinkConnectorConfigVariables.ERRORS_MAX_RETRIES.toString(), "5");
ClickHouseSinkConnectorConfig config= new ClickHouseSinkConnectorConfig(options );
String jdbcUrl = BaseDbWriter.getConnectionString(hostName, port, systemDb);
Connection conn = DbWriter.createConnection(jdbcUrl, BaseDbWriter.DATABASE_CLIENT_NAME, userName, password,
DbWriter.SYSTEM_DB, config);
Expand All @@ -56,10 +63,17 @@ void dropTestDatabase() throws SQLException {
// drop.executeQuery(String.format("DROP DATABASE IF EXISTS %s", dbName));
}

@AfterAll
public static void cleanup() {
clickHouseContainer.stop();
}

@Test
public void testCreateNewDatabase() throws SQLException {
public void testCreateNewDatabase() throws SQLException, InterruptedException {
Thread.sleep(10000);
ClickHouseCreateDatabase act = new ClickHouseCreateDatabase();
Connection conn = dbWriter.getConnection();

try {
act.createNewDatabase(conn, dbName);
} catch(SQLException se) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,5 @@ services:
- JAVA_DEBUG_PORT=*:5005
- DEFAULT_JAVA_DEBUG_PORT=*:5005
- KAFKA_DEBUG=true
- JMX_PORT=39999
- JMX_PORT=39999
- KAFKA_HEAP_OPTS=-Xms1G -Xmx5G
Loading
Loading