Skip to content

Commit

Permalink
Add meta table into system store
Browse files Browse the repository at this point in the history
Change-Id: Idb4d3bffcb1cd941bd8a533981d5ab3d479f4069
  • Loading branch information
Linary committed Jul 6, 2021
1 parent 6a39098 commit 9541494
Show file tree
Hide file tree
Showing 42 changed files with 724 additions and 230 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -590,12 +590,6 @@ public String backend() {
return this.hugegraph.backend();
}

@Override
public String backendVersion() {
this.verifyAnyPermission();
return this.hugegraph.backendVersion();
}

@Override
public BackendStoreInfo backendStoreInfo() {
this.verifyAdminPermission();
Expand Down Expand Up @@ -727,12 +721,6 @@ public void initSystemInfo() {
this.hugegraph.initSystemInfo();
}

@Override
public void initBackendInfo() {
this.verifyAdminPermission();
this.hugegraph.initBackendInfo();
}

@Override
public void createSnapshot() {
this.verifyPermission(HugePermission.WRITE, ResourceType.STATUS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@
import org.apache.tinkerpop.gremlin.util.NumberHelper;

import com.baidu.hugegraph.backend.store.BackendMetrics;
import com.baidu.hugegraph.backend.store.BackendStoreProvider;
import com.baidu.hugegraph.backend.store.BackendTable;
import com.baidu.hugegraph.backend.store.cassandra.CassandraTables.Edge;
import com.baidu.hugegraph.backend.store.cassandra.CassandraTables.Vertex;
import com.baidu.hugegraph.config.CoreOptions;
import com.baidu.hugegraph.config.HugeConfig;
import com.baidu.hugegraph.testutil.Whitebox;
import com.baidu.hugegraph.util.E;
Expand Down Expand Up @@ -70,7 +70,7 @@ public CassandraMetrics(HugeConfig conf,
assert this.username != null && this.password != null;

this.keyspace = keyspace;
String g = conf.get(CoreOptions.STORE_GRAPH);
String g = BackendStoreProvider.GRAPH_STORE;
String v = BackendTable.joinTableName(g, Vertex.TABLE);
String oe = BackendTable.joinTableName(g, "o" + Edge.TABLE_SUFFIX);
String ie = BackendTable.joinTableName(g, "i" + Edge.TABLE_SUFFIX);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -670,5 +670,45 @@ public long getCounter(HugeType type) {
public boolean isSchemaStore() {
return false;
}

public CassandraSessionPool.Session getSession() {
return super.sessions.session();
}
}

public static class CassandraSystemStore extends CassandraGraphStore {

private final CassandraTables.Meta meta;

public CassandraSystemStore(BackendStoreProvider provider,
String keyspace, String store) {
super(provider, keyspace, store);

this.meta = new CassandraTables.Meta();
}

@Override
public void init() {
super.init();
this.checkOpened();
CassandraSessionPool.Session session = this.getSession();
String driverVersion = this.provider().driverVersion();
this.meta.writeVersion(session, driverVersion);
LOG.info("Write down the backend version: {}", driverVersion);
}

@Override
public String storedVersion() {
this.checkOpened();
CassandraSessionPool.Session session = this.getSession();
return this.meta.readVersion(session);
}

@Override
protected Collection<CassandraTable> tables() {
List<CassandraTable> tables = new ArrayList<>(super.tables());
tables.add(this.meta);
return tables;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.baidu.hugegraph.backend.store.BackendStore;
import com.baidu.hugegraph.backend.store.cassandra.CassandraStore.CassandraGraphStore;
import com.baidu.hugegraph.backend.store.cassandra.CassandraStore.CassandraSchemaStore;
import com.baidu.hugegraph.backend.store.cassandra.CassandraStore.CassandraSystemStore;

public class CassandraStoreProvider extends AbstractBackendStoreProvider {

Expand All @@ -40,13 +41,18 @@ protected BackendStore newGraphStore(String store) {
return new CassandraGraphStore(this, this.keyspace(), store);
}

@Override
protected BackendStore newSystemStore(String store) {
return new CassandraSystemStore(this, this.keyspace(), store);
}

@Override
public String type() {
return "cassandra";
}

@Override
public String version() {
public String driverVersion() {
/*
* Versions history:
* [1.0] HugeGraph-1328: supports backend table version checking
Expand All @@ -62,7 +68,8 @@ public String version() {
* [1.8] #746: support userdata for indexlabel
* [1.9] #295: support ttl for vertex and edge
* [1.10] #1333: support read frequency for property key
* [1.11] #1533: add meta table in system store
*/
return "1.10";
return "1.11";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,48 @@ public class CassandraTables {

private static final long COMMIT_DELETE_BATCH = Query.COMMIT_BATCH;

public static class Meta extends CassandraTable {

public static final String TABLE = HugeType.META.string();

public Meta() {
super(TABLE);
}

@Override
public void init(CassandraSessionPool.Session session) {
ImmutableMap<HugeKeys, DataType> pkeys = ImmutableMap.of(
HugeKeys.NAME, DataType.text()
);
ImmutableMap<HugeKeys, DataType> ckeys = ImmutableMap.of();
ImmutableMap<HugeKeys, DataType> columns = ImmutableMap.of(
HugeKeys.VALUE, DataType.text()
);

this.createTable(session, pkeys, ckeys, columns);
}

public void writeVersion(CassandraSessionPool.Session session,
String version) {
Insert insert = QueryBuilder.insertInto(TABLE);
insert.value(formatKey(HugeKeys.NAME), formatKey(HugeKeys.VERSION));
insert.value(formatKey(HugeKeys.VALUE), version);
session.execute(insert);
}

public String readVersion(CassandraSessionPool.Session session) {
Clause where = formatEQ(HugeKeys.NAME, formatKey(HugeKeys.VERSION));
Select select = QueryBuilder.select(formatKey(HugeKeys.VALUE))
.from(TABLE);
select.where(where);
Row row = session.execute(select).one();
if (row == null) {
return null;
}
return row.getString(formatKey(HugeKeys.VALUE));
}
}

public static class Counters extends CassandraTable {

public static final String TABLE = HugeType.COUNTER.string();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,6 @@ public interface HugeGraph extends Graph {

public String name();
public String backend();
public String backendVersion();
public BackendStoreInfo backendStoreInfo();
public BackendFeatures backendStoreFeatures();

Expand All @@ -154,7 +153,6 @@ public interface HugeGraph extends Graph {
public void truncateBackend();

public void initSystemInfo();
public void initBackendInfo();

public void createSnapshot();
public void resumeSnapshot();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package com.baidu.hugegraph;

import com.baidu.hugegraph.analyzer.Analyzer;
import com.baidu.hugegraph.backend.LocalCounter;
import com.baidu.hugegraph.backend.serializer.AbstractSerializer;
import com.baidu.hugegraph.backend.store.BackendFeatures;
import com.baidu.hugegraph.backend.store.BackendStore;
Expand Down Expand Up @@ -67,6 +68,7 @@ public interface HugeGraphParams {

public ServerInfoManager serverManager();

public LocalCounter counter();
public AbstractSerializer serializer();
public Analyzer analyzer();
public RateLimiter writeRateLimiter();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import com.baidu.hugegraph.auth.AuthManager;
import com.baidu.hugegraph.auth.StandardAuthManager;
import com.baidu.hugegraph.backend.BackendException;
import com.baidu.hugegraph.backend.LocalCounter;
import com.baidu.hugegraph.backend.cache.Cache;
import com.baidu.hugegraph.backend.cache.CacheNotifier;
import com.baidu.hugegraph.backend.cache.CacheNotifier.GraphCacheNotifier;
Expand Down Expand Up @@ -147,6 +148,7 @@ public class StandardHugeGraph implements HugeGraph {
private final EventHub graphEventHub;
private final EventHub indexEventHub;

private final LocalCounter localCounter;
private final RateLimiter writeRateLimiter;
private final RateLimiter readRateLimiter;
private final TaskManager taskManager;
Expand All @@ -167,6 +169,8 @@ public StandardHugeGraph(HugeConfig config) {
this.graphEventHub = new EventHub("graph");
this.indexEventHub = new EventHub("index");

this.localCounter = new LocalCounter();

final int writeLimit = config.get(CoreOptions.RATE_LIMIT_WRITE);
this.writeRateLimiter = writeLimit > 0 ?
RateLimiter.create(writeLimit) : null;
Expand Down Expand Up @@ -234,14 +238,11 @@ public String backend() {
return this.storeProvider.type();
}

@Override
public String backendVersion() {
return this.storeProvider.version();
}

@Override
public BackendStoreInfo backendStoreInfo() {
return new BackendStoreInfo(this.schemaTransaction());
// Just for trigger Tx.getOrNewTransaction, then load 3 stores
this.systemTransaction();
return new BackendStoreInfo(this.storeProvider);
}

@Override
Expand Down Expand Up @@ -323,7 +324,12 @@ public void initBackend() {
LockUtil.lock(this.name, LockUtil.GRAPH_LOCK);
try {
this.storeProvider.init();
this.initBackendInfo();
/*
* NOTE: The main goal is to write the serverInfo to the central
* node, such as etcd, and also create the system schema in memory,
* which has no side effects
*/
this.initSystemInfo();
} finally {
LockUtil.unlock(this.name, LockUtil.GRAPH_LOCK);
this.loadGraphStore().close();
Expand Down Expand Up @@ -362,6 +368,7 @@ public void truncateBackend() {
LockUtil.lock(this.name, LockUtil.GRAPH_LOCK);
try {
this.storeProvider.truncate();
// TOOD: remove this after serverinfo saved in etcd
this.serverStarted(this.serverInfoManager().selfServerId(),
this.serverInfoManager().selfServerRole());
} finally {
Expand All @@ -373,24 +380,16 @@ public void truncateBackend() {

@Override
public void initSystemInfo() {
// Initialize user schema
try {
this.serverInfoManager().init();
this.taskScheduler().init();
this.serverInfoManager().init();
this.authManager().init();
} finally {
this.closeTx();
}
LOG.debug("Graph '{}' system info has been initialized", this);
}

@Override
public void initBackendInfo() {
BackendStoreInfo info = this.backendStoreInfo();
info.init();
LOG.debug("Graph '{}' backend info has been initialized", this);
}

@Override
public void createSnapshot() {
LockUtil.lock(this.name, LockUtil.GRAPH_LOCK);
Expand Down Expand Up @@ -452,18 +451,15 @@ private void checkGraphNotClosed() {
}

private BackendStore loadSchemaStore() {
String name = this.configuration.get(CoreOptions.STORE_SCHEMA);
return this.storeProvider.loadSchemaStore(name);
return this.storeProvider.loadSchemaStore();
}

private BackendStore loadGraphStore() {
String graph = this.configuration.get(CoreOptions.STORE_GRAPH);
return this.storeProvider.loadGraphStore(graph);
return this.storeProvider.loadGraphStore();
}

private BackendStore loadSystemStore() {
String name = this.configuration.get(CoreOptions.STORE_SYSTEM);
return this.storeProvider.loadSystemStore(name);
return this.storeProvider.loadSystemStore();
}

@Watched
Expand Down Expand Up @@ -1155,6 +1151,11 @@ public ServerInfoManager serverManager() {
return StandardHugeGraph.this.serverInfoManager();
}

@Override
public LocalCounter counter() {
return StandardHugeGraph.this.localCounter;
}

@Override
public AbstractSerializer serializer() {
return StandardHugeGraph.this.serializer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ public String store() {
return this.store.store();
}

@Override
public String storedVersion() {
return this.store.storedVersion();
}

@Override
public String database() {
return this.store.database();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ public void registerMetaHandler(String name, MetaHandler<Session> handler) {
this.dispatcher.registerMetaHandler(name, handler);
}

@Override
public String storedVersion() {
throw new UnsupportedOperationException();
}

@Override
public SystemSchemaStore systemSchemaStore() {
return this.systemSchemaStore;
Expand All @@ -50,7 +55,7 @@ public SystemSchemaStore systemSchemaStore() {
@Override
public <R> R metadata(HugeType type, String meta, Object[] args) {
Session session = this.session(type);
MetaDispatcher<Session> dispatcher = null;
MetaDispatcher<Session> dispatcher;
if (type == null) {
dispatcher = this.metaDispatcher();
} else {
Expand Down
Loading

0 comments on commit 9541494

Please sign in to comment.