Skip to content

Commit

Permalink
enable streamable blockstore
Browse files Browse the repository at this point in the history
  • Loading branch information
postables committed Mar 27, 2020
1 parent 9502419 commit a64fa2a
Show file tree
Hide file tree
Showing 13 changed files with 921 additions and 618 deletions.
2 changes: 2 additions & 0 deletions doc/PROTO.md
Original file line number Diff line number Diff line change
Expand Up @@ -907,6 +907,7 @@ BSREQTYPE is a particular blockstore request type
| BS_PUT_MANY | 2 | BS_PUT_MANY is used to put many blocks in the store |
| BS_GET | 3 | BS_GET is used to get a block from the store |
| BS_GET_MANY | 4 | BS_GET_MANY is used to get many blocks from the store |
| BS_GET_ALL | 5 | BS_GET_ALL is used to retrieve all blocks from the store It is the gRPC equivalent of Blockstore::AllKeysChan |



Expand Down Expand Up @@ -1009,6 +1010,7 @@ NodeAPI provide an API to control the underlying custom ipfs node
| Extras | [ExtrasRequest](#pb.ExtrasRequest) | [Empty](#pb.Empty) | Extras provide control over node extras capabilities |
| P2P | [P2PRequest](#pb.P2PRequest) | [P2PResponse](#pb.P2PResponse) | P2P allows control of generalized p2p streams for tcp/udp based protocol. By using this RPC, we can tunnel traffic similar to ssh tunneling except using libp2p as the transport layer, and and tcp/udp port. |
| Blockstore | [BlockstoreRequest](#pb.BlockstoreRequest) | [BlockstoreResponse](#pb.BlockstoreResponse) | Blockstore allows low-level management of the underlying blockstore |
| BlockstoreStream | [BlockstoreRequest](#pb.BlockstoreRequest) stream | [BlockstoreResponse](#pb.BlockstoreResponse) stream | BlockstoreStream is akin to Blockstore, except streamable Once v4 is out, condense this + blockstore into a single call |
| Dag | [DagRequest](#pb.DagRequest) | [DagResponse](#pb.DagResponse) | Dag is a unidirectional rpc allowing manipulation of low-level ipld objects |
| Keystore | [KeystoreRequest](#pb.KeystoreRequest) | [KeystoreResponse](#pb.KeystoreResponse) | Keystore is a unidirectional RPC allowing management of ipfs keystores |
| Persist | [PersistRequest](#pb.PersistRequest) | [PersistResponse](#pb.PersistResponse) | Persist is used to retrieve data from the network and make it available locally |
Expand Down
280 changes: 180 additions & 100 deletions go/node.pb.go

Large diffs are not rendered by default.

66 changes: 66 additions & 0 deletions java/pb/NodeAPIGrpc.java
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,37 @@ pb.Node.BlockstoreResponse> getBlockstoreMethod() {
return getBlockstoreMethod;
}

private static volatile io.grpc.MethodDescriptor<pb.Node.BlockstoreRequest,
pb.Node.BlockstoreResponse> getBlockstoreStreamMethod;

@io.grpc.stub.annotations.RpcMethod(
fullMethodName = SERVICE_NAME + '/' + "BlockstoreStream",
requestType = pb.Node.BlockstoreRequest.class,
responseType = pb.Node.BlockstoreResponse.class,
methodType = io.grpc.MethodDescriptor.MethodType.BIDI_STREAMING)
public static io.grpc.MethodDescriptor<pb.Node.BlockstoreRequest,
pb.Node.BlockstoreResponse> getBlockstoreStreamMethod() {
io.grpc.MethodDescriptor<pb.Node.BlockstoreRequest, pb.Node.BlockstoreResponse> getBlockstoreStreamMethod;
if ((getBlockstoreStreamMethod = NodeAPIGrpc.getBlockstoreStreamMethod) == null) {
synchronized (NodeAPIGrpc.class) {
if ((getBlockstoreStreamMethod = NodeAPIGrpc.getBlockstoreStreamMethod) == null) {
NodeAPIGrpc.getBlockstoreStreamMethod = getBlockstoreStreamMethod =
io.grpc.MethodDescriptor.<pb.Node.BlockstoreRequest, pb.Node.BlockstoreResponse>newBuilder()
.setType(io.grpc.MethodDescriptor.MethodType.BIDI_STREAMING)
.setFullMethodName(generateFullMethodName(SERVICE_NAME, "BlockstoreStream"))
.setSampledToLocalTracing(true)
.setRequestMarshaller(io.grpc.protobuf.ProtoUtils.marshaller(
pb.Node.BlockstoreRequest.getDefaultInstance()))
.setResponseMarshaller(io.grpc.protobuf.ProtoUtils.marshaller(
pb.Node.BlockstoreResponse.getDefaultInstance()))
.setSchemaDescriptor(new NodeAPIMethodDescriptorSupplier("BlockstoreStream"))
.build();
}
}
}
return getBlockstoreStreamMethod;
}

private static volatile io.grpc.MethodDescriptor<pb.Node.DagRequest,
pb.Node.DagResponse> getDagMethod;

Expand Down Expand Up @@ -340,6 +371,17 @@ public void blockstore(pb.Node.BlockstoreRequest request,
asyncUnimplementedUnaryCall(getBlockstoreMethod(), responseObserver);
}

/**
* <pre>
* BlockstoreStream is akin to Blockstore, except streamable
* Once v4 is out, condense this + blockstore into a single call
* </pre>
*/
public io.grpc.stub.StreamObserver<pb.Node.BlockstoreRequest> blockstoreStream(
io.grpc.stub.StreamObserver<pb.Node.BlockstoreResponse> responseObserver) {
return asyncUnimplementedStreamingCall(getBlockstoreStreamMethod(), responseObserver);
}

/**
* <pre>
* Dag is a unidirectional rpc allowing manipulation of low-level ipld objects
Expand Down Expand Up @@ -400,6 +442,13 @@ public void persist(pb.Node.PersistRequest request,
pb.Node.BlockstoreRequest,
pb.Node.BlockstoreResponse>(
this, METHODID_BLOCKSTORE)))
.addMethod(
getBlockstoreStreamMethod(),
asyncBidiStreamingCall(
new MethodHandlers<
pb.Node.BlockstoreRequest,
pb.Node.BlockstoreResponse>(
this, METHODID_BLOCKSTORE_STREAM)))
.addMethod(
getDagMethod(),
asyncUnaryCall(
Expand Down Expand Up @@ -488,6 +537,18 @@ public void blockstore(pb.Node.BlockstoreRequest request,
getChannel().newCall(getBlockstoreMethod(), getCallOptions()), request, responseObserver);
}

/**
* <pre>
* BlockstoreStream is akin to Blockstore, except streamable
* Once v4 is out, condense this + blockstore into a single call
* </pre>
*/
public io.grpc.stub.StreamObserver<pb.Node.BlockstoreRequest> blockstoreStream(
io.grpc.stub.StreamObserver<pb.Node.BlockstoreResponse> responseObserver) {
return asyncBidiStreamingCall(
getChannel().newCall(getBlockstoreStreamMethod(), getCallOptions()), responseObserver);
}

/**
* <pre>
* Dag is a unidirectional rpc allowing manipulation of low-level ipld objects
Expand Down Expand Up @@ -716,6 +777,7 @@ public com.google.common.util.concurrent.ListenableFuture<pb.Node.PersistRespons
private static final int METHODID_DAG = 4;
private static final int METHODID_KEYSTORE = 5;
private static final int METHODID_PERSIST = 6;
private static final int METHODID_BLOCKSTORE_STREAM = 7;

private static final class MethodHandlers<Req, Resp> implements
io.grpc.stub.ServerCalls.UnaryMethod<Req, Resp>,
Expand Down Expand Up @@ -772,6 +834,9 @@ public void invoke(Req request, io.grpc.stub.StreamObserver<Resp> responseObserv
public io.grpc.stub.StreamObserver<Req> invoke(
io.grpc.stub.StreamObserver<Resp> responseObserver) {
switch (methodId) {
case METHODID_BLOCKSTORE_STREAM:
return (io.grpc.stub.StreamObserver<Req>) serviceImpl.blockstoreStream(
(io.grpc.stub.StreamObserver<pb.Node.BlockstoreResponse>) responseObserver);
default:
throw new AssertionError();
}
Expand Down Expand Up @@ -827,6 +892,7 @@ public static io.grpc.ServiceDescriptor getServiceDescriptor() {
.addMethod(getExtrasMethod())
.addMethod(getP2PMethod())
.addMethod(getBlockstoreMethod())
.addMethod(getBlockstoreStreamMethod())
.addMethod(getDagMethod())
.addMethod(getKeystoreMethod())
.addMethod(getPersistMethod())
Expand Down
13 changes: 13 additions & 0 deletions js/node_grpc_pb.js
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,19 @@ blockstore: {
responseSerialize: serialize_pb_BlockstoreResponse,
responseDeserialize: deserialize_pb_BlockstoreResponse,
},
// BlockstoreStream is akin to Blockstore, except streamable
// Once v4 is out, condense this + blockstore into a single call
blockstoreStream: {
path: '/pb.NodeAPI/BlockstoreStream',
requestStream: true,
responseStream: true,
requestType: node_pb.BlockstoreRequest,
responseType: node_pb.BlockstoreResponse,
requestSerialize: serialize_pb_BlockstoreRequest,
requestDeserialize: deserialize_pb_BlockstoreRequest,
responseSerialize: serialize_pb_BlockstoreResponse,
responseDeserialize: deserialize_pb_BlockstoreResponse,
},
// Dag is a unidirectional rpc allowing manipulation of low-level ipld objects
dag: {
path: '/pb.NodeAPI/Dag',
Expand Down
3 changes: 2 additions & 1 deletion js/node_pb.js
Original file line number Diff line number Diff line change
Expand Up @@ -4568,7 +4568,8 @@ proto.pb.BSREQTYPE = {
BS_PUT: 1,
BS_PUT_MANY: 2,
BS_GET: 3,
BS_GET_MANY: 4
BS_GET_MANY: 4,
BS_GET_ALL: 5
};

/**
Expand Down
6 changes: 6 additions & 0 deletions pb/node.proto
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ service NodeAPI {
rpc P2P(P2PRequest) returns (P2PResponse) { };
// Blockstore allows low-level management of the underlying blockstore
rpc Blockstore(BlockstoreRequest) returns (BlockstoreResponse) { };
// BlockstoreStream is akin to Blockstore, except streamable
// Once v4 is out, condense this + blockstore into a single call
rpc BlockstoreStream(stream BlockstoreRequest) returns (stream BlockstoreResponse) { };
// Dag is a unidirectional rpc allowing manipulation of low-level ipld objects
rpc Dag(DagRequest) returns (DagResponse) { };
// Keystore is a unidirectional RPC allowing management of ipfs keystores
Expand Down Expand Up @@ -166,6 +169,9 @@ enum BSREQTYPE {
BS_GET = 3;
// BS_GET_MANY is used to get many blocks from the store
BS_GET_MANY = 4;
// BS_GET_ALL is used to retrieve all blocks from the store
// It is the gRPC equivalent of Blockstore::AllKeysChan
BS_GET_ALL = 5;
}

// BSREQOPTS are options for blockstore requests
Expand Down
Loading

0 comments on commit a64fa2a

Please sign in to comment.