Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#11481] Fix DirectByteBuffer leak in ActiveThreadCount #11483

Merged
merged 1 commit into from
Sep 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.navercorp.pinpoint.profiler.receiver.grpc;

import com.google.protobuf.Empty;
import com.navercorp.pinpoint.grpc.trace.PCmdActiveThreadCountRes;
import com.navercorp.pinpoint.grpc.trace.PCmdStreamResponse;
import io.grpc.stub.ClientResponseObserver;
Expand All @@ -27,7 +28,7 @@
/**
* @author Taejin Koo
*/
public class ActiveThreadCountStreamSocket implements GrpcProfilerStreamSocket<PCmdActiveThreadCountRes.Builder> {
public class ActiveThreadCountStreamSocket implements GrpcProfilerStreamSocket<PCmdActiveThreadCountRes, Empty> {

private final Logger logger = LogManager.getLogger(this.getClass());

Expand All @@ -36,23 +37,27 @@
private final int streamObserverId;
private int sequenceId = 0;

private final PinpointClientResponseObserver<PCmdActiveThreadCountRes> clientResponseObserver;
private final PinpointClientResponseObserver<PCmdActiveThreadCountRes, Empty> clientResponseObserver;

public ActiveThreadCountStreamSocket(int streamObserverId, GrpcStreamService grpcStreamService) {
this.streamObserverId = streamObserverId;
this.grpcStreamService = Objects.requireNonNull(grpcStreamService, "grpcStreamService");
this.clientResponseObserver = new PinpointClientResponseObserver<>(this);
}

@Override
public void send(PCmdActiveThreadCountRes.Builder sendBuilder) {
public PCmdStreamResponse newHeader() {
PCmdStreamResponse.Builder headerResponseBuilder = PCmdStreamResponse.newBuilder();
headerResponseBuilder.setResponseId(streamObserverId);
headerResponseBuilder.setSequenceId(getSequenceId());
sendBuilder.setCommonStreamResponse(headerResponseBuilder.build());
return headerResponseBuilder.build();

Check warning on line 52 in agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/ActiveThreadCountStreamSocket.java

View check run for this annotation

Codecov / codecov/patch

agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/ActiveThreadCountStreamSocket.java#L52

Added line #L52 was not covered by tests
}

@Override
public void send(PCmdActiveThreadCountRes activeThreadCount) {
if (clientResponseObserver.isReady()) {
clientResponseObserver.getRequestObserver().onNext(sendBuilder.build());
clientResponseObserver.sendRequest(activeThreadCount);

Check warning on line 58 in agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/ActiveThreadCountStreamSocket.java

View check run for this annotation

Codecov / codecov/patch

agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/ActiveThreadCountStreamSocket.java#L58

Added line #L58 was not covered by tests
} else {
logger.info("Send fail. (ActiveThreadCount) client is not ready. streamObserverId:{}", streamObserverId);

Check warning on line 60 in agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/ActiveThreadCountStreamSocket.java

View check run for this annotation

Codecov / codecov/patch

agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/ActiveThreadCountStreamSocket.java#L60

Added line #L60 was not covered by tests
}
}

Expand Down Expand Up @@ -86,18 +91,12 @@
}

private void close0(Throwable throwable) {
if (clientResponseObserver.isReady()) {
if (throwable == null) {
clientResponseObserver.getRequestObserver().onCompleted();
} else {
clientResponseObserver.getRequestObserver().onError(throwable);
}
}
clientResponseObserver.close(throwable);

Check warning on line 94 in agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/ActiveThreadCountStreamSocket.java

View check run for this annotation

Codecov / codecov/patch

agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/ActiveThreadCountStreamSocket.java#L94

Added line #L94 was not covered by tests
grpcStreamService.unregister(this);
}

@Override
public ClientResponseObserver getResponseObserver() {
public ClientResponseObserver<PCmdActiveThreadCountRes, Empty> getResponseObserver() {
return clientResponseObserver;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,22 @@

package com.navercorp.pinpoint.profiler.receiver.grpc;

import com.google.protobuf.Empty;
import com.navercorp.pinpoint.grpc.trace.PCmdActiveThreadCountRes;
import com.navercorp.pinpoint.grpc.trace.PCmdRequest;
import com.navercorp.pinpoint.grpc.trace.PCmdStreamResponse;
import com.navercorp.pinpoint.grpc.trace.PCommandType;
import com.navercorp.pinpoint.grpc.trace.ProfilerCommandServiceGrpc;
import com.navercorp.pinpoint.profiler.context.active.ActiveTraceHistogram;
import com.navercorp.pinpoint.profiler.context.active.ActiveTraceHistogramUtils;
import com.navercorp.pinpoint.profiler.context.active.ActiveTraceRepository;
import io.grpc.stub.ClientResponseObserver;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.TimerTask;
Expand Down Expand Up @@ -58,7 +62,8 @@
@Override
public void handle(PCmdRequest request, ProfilerCommandServiceGrpc.ProfilerCommandServiceStub profilerCommandServiceStub) {
ActiveThreadCountStreamSocket activeThreadCountStreamSocket = new ActiveThreadCountStreamSocket(request.getRequestId(), grpcStreamService);
profilerCommandServiceStub.commandStreamActiveThreadCount(activeThreadCountStreamSocket.getResponseObserver());
ClientResponseObserver<PCmdActiveThreadCountRes, Empty> responseObserver = activeThreadCountStreamSocket.getResponseObserver();
profilerCommandServiceStub.commandStreamActiveThreadCount(responseObserver);

Check warning on line 66 in agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/GrpcActiveThreadCountService.java

View check run for this annotation

Codecov / codecov/patch

agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/GrpcActiveThreadCountService.java#L65-L66

Added lines #L65 - L66 were not covered by tests

grpcStreamService.register(activeThreadCountStreamSocket, new ActiveThreadCountTimerTask());
}
Expand Down Expand Up @@ -90,15 +95,21 @@
@Override
public void run() {
if (isDebug) {
LOGGER.debug("ActiveThreadCountTimerTask started. streamSocketList:{}", grpcStreamService.getStreamSocketList());
LOGGER.debug("ActiveThreadCountTimerTask started. streamSocketList:{}", Arrays.toString(grpcStreamService.getStreamSocketList()));

Check warning on line 98 in agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/GrpcActiveThreadCountService.java

View check run for this annotation

Codecov / codecov/patch

agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/GrpcActiveThreadCountService.java#L98

Added line #L98 was not covered by tests
}

PCmdActiveThreadCountRes.Builder activeThreadCountResponseBuilder = getActiveThreadCountResponse();
for (GrpcProfilerStreamSocket streamSocket : grpcStreamService.getStreamSocketList()) {
if (streamSocket != null) {
for (GrpcProfilerStreamSocket<?, ?> streamSocket : grpcStreamService.getStreamSocketList()) {
if (streamSocket instanceof ActiveThreadCountStreamSocket) {
try {
streamSocket.send(activeThreadCountResponseBuilder);
} catch (Exception e) {
final ActiveThreadCountStreamSocket stream = (ActiveThreadCountStreamSocket) streamSocket;

Check warning on line 105 in agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/GrpcActiveThreadCountService.java

View check run for this annotation

Codecov / codecov/patch

agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/GrpcActiveThreadCountService.java#L105

Added line #L105 was not covered by tests

PCmdStreamResponse header = stream.newHeader();
activeThreadCountResponseBuilder.setCommonStreamResponse(header);
PCmdActiveThreadCountRes activeThreadCount = activeThreadCountResponseBuilder.build();

Check warning on line 109 in agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/GrpcActiveThreadCountService.java

View check run for this annotation

Codecov / codecov/patch

agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/GrpcActiveThreadCountService.java#L107-L109

Added lines #L107 - L109 were not covered by tests

stream.send(activeThreadCount);
} catch (Throwable e) {

Check warning on line 112 in agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/GrpcActiveThreadCountService.java

View check run for this annotation

Codecov / codecov/patch

agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/GrpcActiveThreadCountService.java#L111-L112

Added lines #L111 - L112 were not covered by tests
LOGGER.warn("failed to execute ActiveThreadCountTimerTask.run method. streamSocket:{}, message:{}", streamSocket, e.getMessage(), e);
streamSocket.close(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
/**
* @author Taejin Koo
*/
public interface GrpcProfilerStreamSocket<T> {
public interface GrpcProfilerStreamSocket<Req, Res> {

void send(T send);
void send(Req send);

void close();

Expand All @@ -33,6 +33,6 @@ public interface GrpcProfilerStreamSocket<T> {

void disconnect(Throwable throwable);

ClientResponseObserver getResponseObserver();
ClientResponseObserver<Req, Res> getResponseObserver();

}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class GrpcStreamService {

private final AtomicReference<TimerTask> currentTaskReference = new AtomicReference<>();

private final List<GrpcProfilerStreamSocket<?>> grpcProfilerStreamSocketList = new CopyOnWriteArrayList<>();
private final List<GrpcProfilerStreamSocket<?, ?>> grpcProfilerStreamSocketList = new CopyOnWriteArrayList<>();

public GrpcStreamService(String name, long flushDelay) {
Objects.requireNonNull(name, "name");
Expand All @@ -50,11 +50,11 @@ public GrpcStreamService(String name, long flushDelay) {
this.flushDelay = flushDelay;
}

public GrpcProfilerStreamSocket<?>[] getStreamSocketList() {
public GrpcProfilerStreamSocket<?, ?>[] getStreamSocketList() {
return grpcProfilerStreamSocketList.toArray(new GrpcProfilerStreamSocket[0]);
}

public boolean register(GrpcProfilerStreamSocket<?> streamSocket, TimerTask timerTask) {
public boolean register(GrpcProfilerStreamSocket<?, ?> streamSocket, TimerTask timerTask) {
synchronized (lock) {
grpcProfilerStreamSocketList.add(streamSocket);
boolean turnOn = currentTaskReference.compareAndSet(null, timerTask);
Expand All @@ -67,7 +67,7 @@ public boolean register(GrpcProfilerStreamSocket<?> streamSocket, TimerTask time
return false;
}

public boolean unregister(GrpcProfilerStreamSocket<?> streamSocket) {
public boolean unregister(GrpcProfilerStreamSocket<?, ?> streamSocket) {
synchronized (lock) {
grpcProfilerStreamSocketList.remove(streamSocket);
// turnoff
Expand All @@ -91,8 +91,8 @@ public void close() {
timer.cancel();
}

GrpcProfilerStreamSocket<?>[] streamSockets = grpcProfilerStreamSocketList.toArray(new GrpcProfilerStreamSocket[0]);
for (GrpcProfilerStreamSocket<?> streamSocket : streamSockets) {
GrpcProfilerStreamSocket<?, ?>[] streamSockets = grpcProfilerStreamSocketList.toArray(new GrpcProfilerStreamSocket[0]);
for (GrpcProfilerStreamSocket<?, ?> streamSocket : streamSockets) {
if (streamSocket != null) {
streamSocket.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package com.navercorp.pinpoint.profiler.receiver.grpc;

import com.google.protobuf.Empty;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientResponseObserver;

Expand All @@ -25,14 +24,14 @@
/**
* @author Taejin Koo
*/
public class PinpointClientResponseObserver<ReqT> implements ClientResponseObserver<ReqT, Empty> {
public class PinpointClientResponseObserver<ReqT, ResT> implements ClientResponseObserver<ReqT, ResT> {

private final GrpcProfilerStreamSocket pinpointGrpcProfilerStreamSocket;
private final GrpcProfilerStreamSocket<ReqT, ResT> socket;

private volatile ClientCallStreamObserver<ReqT> requestStream;

public PinpointClientResponseObserver(GrpcProfilerStreamSocket pinpointGrpcProfilerStreamSocket) {
this.pinpointGrpcProfilerStreamSocket = Objects.requireNonNull(pinpointGrpcProfilerStreamSocket, "pinpointGrpcProfilerStreamSocket");
public PinpointClientResponseObserver(GrpcProfilerStreamSocket<ReqT, ResT> socket) {
this.socket = Objects.requireNonNull(socket, "socket");
}

@Override
Expand All @@ -41,26 +40,48 @@
}

@Override
public void onNext(Empty value) {
public void onNext(ResT res) {
// do nothing
}

@Override
public void onError(Throwable t) {
pinpointGrpcProfilerStreamSocket.disconnect(t);
socket.disconnect(t);

Check warning on line 49 in agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/PinpointClientResponseObserver.java

View check run for this annotation

Codecov / codecov/patch

agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/PinpointClientResponseObserver.java#L49

Added line #L49 was not covered by tests
}

@Override
public void onCompleted() {
pinpointGrpcProfilerStreamSocket.disconnect();
socket.disconnect();
}

Check warning on line 55 in agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/PinpointClientResponseObserver.java

View check run for this annotation

Codecov / codecov/patch

agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/PinpointClientResponseObserver.java#L54-L55

Added lines #L54 - L55 were not covered by tests

public void sendRequest(ReqT value) {
final ClientCallStreamObserver<ReqT> copy = this.requestStream;

Check warning on line 58 in agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/PinpointClientResponseObserver.java

View check run for this annotation

Codecov / codecov/patch

agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/PinpointClientResponseObserver.java#L58

Added line #L58 was not covered by tests
if (copy == null) {
return;

Check warning on line 60 in agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/PinpointClientResponseObserver.java

View check run for this annotation

Codecov / codecov/patch

agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/PinpointClientResponseObserver.java#L60

Added line #L60 was not covered by tests
}
copy.onNext(value);

Check warning on line 62 in agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/PinpointClientResponseObserver.java

View check run for this annotation

Codecov / codecov/patch

agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/PinpointClientResponseObserver.java#L62

Added line #L62 was not covered by tests
}

public boolean isReady() {
return requestStream != null;
final ClientCallStreamObserver<ReqT> copy = this.requestStream;
if (copy == null) {
return false;
}
return copy.isReady();
}

public ClientCallStreamObserver<ReqT> getRequestObserver() {
return requestStream;

public void close(Throwable throwable) {
final ClientCallStreamObserver<ReqT> copy = requestStream;

Check warning on line 75 in agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/PinpointClientResponseObserver.java

View check run for this annotation

Codecov / codecov/patch

agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/PinpointClientResponseObserver.java#L75

Added line #L75 was not covered by tests
if (copy == null) {
return;

Check warning on line 77 in agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/PinpointClientResponseObserver.java

View check run for this annotation

Codecov / codecov/patch

agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/PinpointClientResponseObserver.java#L77

Added line #L77 was not covered by tests
}

if (throwable == null) {
copy.onCompleted();

Check warning on line 81 in agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/PinpointClientResponseObserver.java

View check run for this annotation

Codecov / codecov/patch

agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/PinpointClientResponseObserver.java#L81

Added line #L81 was not covered by tests
} else {
copy.onError(throwable);

Check warning on line 83 in agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/PinpointClientResponseObserver.java

View check run for this annotation

Codecov / codecov/patch

agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/PinpointClientResponseObserver.java#L83

Added line #L83 was not covered by tests
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package com.navercorp.pinpoint.profiler.receiver.grpc;

import com.google.protobuf.Empty;
import io.grpc.stub.ClientCallStreamObserver;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

class PinpointClientResponseObserverTest {

@Test
void isReady_true() {
GrpcProfilerStreamSocket<String, Empty> socket = mock(GrpcProfilerStreamSocket.class);
PinpointClientResponseObserver<String, Empty> responseObserver = new PinpointClientResponseObserver<>(socket);

ClientCallStreamObserver<String> requestStream = mock(ClientCallStreamObserver.class);
when(requestStream.isReady()).thenReturn(true);
responseObserver.beforeStart(requestStream);

Assertions.assertTrue(responseObserver.isReady());
}

@Test
void isReady_false() {
GrpcProfilerStreamSocket<String, Empty> socket = mock(GrpcProfilerStreamSocket.class);
PinpointClientResponseObserver<String, Empty> responseObserver = new PinpointClientResponseObserver<>(socket);

Assertions.assertFalse(responseObserver.isReady());

ClientCallStreamObserver<String> requestStream = mock(ClientCallStreamObserver.class);
responseObserver.beforeStart(requestStream);
Assertions.assertFalse(responseObserver.isReady());
}
}