Skip to content

Commit

Permalink
[pinpoint-apm#11431] Striping span connection
Browse files Browse the repository at this point in the history
  • Loading branch information
emeroad committed Sep 3, 2024
1 parent c59fe6f commit 19f989b
Show file tree
Hide file tree
Showing 12 changed files with 426 additions and 186 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.navercorp.pinpoint.collector.util.DurabilityApplier;
import com.navercorp.pinpoint.common.hbase.config.DistributorConfiguration;
import com.navercorp.pinpoint.common.hbase.config.HbaseNamespaceConfiguration;
import com.navercorp.pinpoint.common.hbase.config.HbasePutWriterConfiguration;
import com.navercorp.pinpoint.common.hbase.config.HbaseTemplateConfiguration;
import com.navercorp.pinpoint.common.server.CommonsHbaseConfiguration;
import com.navercorp.pinpoint.common.server.bo.SpanBo;
Expand All @@ -33,6 +34,8 @@

HbaseClientConfiguration.class,
HbaseTemplateConfiguration.class,
HbasePutWriterConfiguration.class,

BatchHbaseClientConfiguration.class,

HbaseAsyncConfiguration.class,
Expand Down
1 change: 1 addition & 0 deletions collector/src/main/resources/hbase-root.properties
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ hbase.client.put-writer.async-poller.span.minCpuCore=4
hbase.client.put-writer.async-poller.span.queueSize=10000
hbase.client.put-writer.async-poller.span.writeBufferSize=100
hbase.client.put-writer.async-poller.span.writeBufferPeriodicFlush=100
hbase.client.put-writer.async-poller.span.connectionSize=1

# parallelism=0 : auto detect cpu core
hbase.client.put-writer.async-poller.default.parallelism=0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public class AsyncConnectionFactoryBean implements FactoryBean<AsyncConnection>,
private final Configuration configuration;
private AsyncConnection connection;

private AsyncConnectionCleaner cleaner = new AsyncConnectionCleaner();
private final AsyncConnectionCleaner cleaner = new AsyncConnectionCleaner();
private Consumer<AsyncConnection> postProcessor;

public AsyncConnectionFactoryBean(Configuration configuration, User user) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ public class AsyncPollerOption {
private int cpuRatio = 1;
private int minCpuCore = 2;

private int connectionSize = 1;


public int getQueueSize() {
return queueSize;
Expand Down Expand Up @@ -74,6 +76,14 @@ public void setMinCpuCore(int minCpuCore) {
this.minCpuCore = minCpuCore;
}

public int getConnectionSize() {
return connectionSize;
}

public void setConnectionSize(int connectionSize) {
this.connectionSize = connectionSize;
}

@Override
public String toString() {
return "AsyncPollerOption{" +
Expand All @@ -83,6 +93,7 @@ public String toString() {
", parallelism=" + parallelism +
", cpuRatio=" + cpuRatio +
", minCpuCore=" + minCpuCore +
", connectionSize=" + connectionSize +
'}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.navercorp.pinpoint.common.hbase.async;

import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.AsyncConnection;
import org.apache.hadoop.hbase.client.AsyncTable;

import java.util.Objects;

public class AsyncTableWriterSelectorFactory implements TableWriterFactory {

private final ConnectionSelector selector;

public AsyncTableWriterSelectorFactory(ConnectionSelector selector) {
this.selector = Objects.requireNonNull(selector, "selector");
}

@Override
public Writer writer(TableName tableName) {
AsyncConnection connection = selector.getConnection();
final AsyncTable<?> table = connection.getTable(tableName);
return table::put;
}

@Override
public String toString() {
return "AsyncTableWriterSelectorFactory{" +
"selector=" + selector +
'}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.navercorp.pinpoint.common.hbase.async;

import org.apache.hadoop.hbase.client.AsyncConnection;

public interface ConnectionSelector {
AsyncConnection getConnection();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package com.navercorp.pinpoint.common.hbase.async;

import com.navercorp.pinpoint.common.util.IOUtils;
import org.apache.hadoop.hbase.client.AsyncConnection;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.Closeable;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;

public class RoundRobinSelector implements ConnectionSelector, Closeable {
private final Logger logger = LogManager.getLogger(this.getClass());

private final AtomicInteger mod = new AtomicInteger(0);

private final AsyncConnection[] connections;

public RoundRobinSelector(List<AsyncConnection> connections) {
Objects.requireNonNull(connections, "connections");
this.connections = connections.toArray(new AsyncConnection[0]);
}

void setModKey(int mod) {
this.mod.set(mod);
}

@Override
public AsyncConnection getConnection() {
final int index = getIndex();
return connections[index];
}

private int getIndex() {
final long next = mod.getAndIncrement();
return Math.floorMod(next, connections.length);
}

@Override
public void close() {
logger.info("Closing connections {}", connections.length);
for (AsyncConnection connection : connections) {
IOUtils.closeQuietly(connection, (ioe) -> logger.warn("Failed to close connection", ioe));
}
}

@Override
public String toString() {
return "RoundRobinSelector{" +
"connections=" + Arrays.toString(connections) +
'}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package com.navercorp.pinpoint.common.hbase.async;

import com.navercorp.pinpoint.common.util.IOUtils;
import org.apache.hadoop.hbase.client.AsyncConnection;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.Closeable;
import java.util.Objects;

public class SimpleConnectionSelector implements ConnectionSelector, Closeable {
private final Logger logger = LogManager.getLogger(this.getClass());

private final AsyncConnection connection;

public SimpleConnectionSelector(AsyncConnection connection) {
this.connection = Objects.requireNonNull(connection, "connection");

}


@Override
public AsyncConnection getConnection() {
return connection;
}

@Override
public void close() {
IOUtils.closeQuietly(connection, (ioe) -> logger.warn("Failed to close connection", ioe));
}

@Override
public String toString() {
return "SimpleConnectionSelector{" +
"connection=" + connection +
'}';
}
}
Loading

0 comments on commit 19f989b

Please sign in to comment.