Nebula Graph 是一款開源的、分散式的、易擴充套件的原生圖資料庫,能夠承載包含數千億個點和數萬億條邊的超大規模資料集,並且提供毫秒級查詢。
當前Nebula Graph的最新版本是3.2.1, 根據官方的檔案進行設定
TAG和EDGE都會對應一組的屬性(map, 或者說dict)
一個點可以對多個TAG, 每個TAG一組屬性, 多組屬性. 專案中建議一開始不要用多TAG, 在整個圖結構穩定後, 再做合併
一個邊只對應一個EDGE, 一組屬性
-- 列出圖空間 SHOW SPACES; -- 列出tag(點型別)和edge(邊型別), 需要先 USE 一個圖空間 SHOW TAGS; SHOW EDGES;
MATCH ()-[e:follow]-() RETURN e MATCH (v:player) RETURN v
帶條件的查詢, 在結果數量較多時必須帶limit, 否則Nebula會報錯
match (v:ADDRESS)-[e]-() where id(v)=="ADD:82388116" return v,e limit 100
在上面的連結中, 提供了最小的設定和測試程式碼
對於Nebula Graph 3.2.1, 需要使用3.0.0的版本. client的每個版本只能對應特定的一兩個伺服器端版本
<dependency> <groupId>com.vesoft</groupId> <artifactId>client</artifactId> <version>3.0.0</version> </dependency>
Java呼叫主要是三部分, 建立連線池, 建立對談, 執行查詢
連線到地址127.0.0.1, 埠9669, 連線池大小100. 注意地址和埠是一個列表, Nebula是支援叢集的. 連線時不需要使用者和密碼
NebulaPool pool = new NebulaPool(); try { NebulaPoolConfig nebulaPoolConfig = new NebulaPoolConfig(); nebulaPoolConfig.setMaxConnSize(100); List<HostAddress> addresses = Arrays.asList(new HostAddress("", 9669)); Boolean initResult = pool.init(addresses, nebulaPoolConfig); if (!initResult) { log.error("pool init failed."); return; } } catch () //...
Session session = pool.getSession("root", "nebula", false);
建立一個SPACE, 然後使用這個SPACE, 建立一個TAG person, 建立一個EDGE like
String createSchema = "CREATE SPACE IF NOT EXISTS test(vid_type=fixed_string(20)); " + "USE test;" + "CREATE TAG IF NOT EXISTS person(name string, age int);" + "CREATE EDGE IF NOT EXISTS like(likeness double)"; ResultSet resp = session.execute(createSchema); if (!resp.isSucceeded()) { log.error(String.format("Execute: `%s', failed: %s", createSchema, resp.getErrorMessage())); System.exit(1); }
String insertVertexes = "INSERT VERTEX person(name, age) VALUES " + "'Bob':('Bob', 10), " + "'Lily':('Lily', 9), " + "'Tom':('Tom', 10), " + "'Jerry':('Jerry', 13), " + "'John':('John', 11);"; ResultSet resp = session.execute(insertVertexes); if (!resp.isSucceeded()) { log.error(String.format("Execute: `%s', failed: %s", insertVertexes, resp.getErrorMessage())); System.exit(1); }
String query = "GO FROM "Bob" OVER like " + "YIELD $^.person.name, $^.person.age, like.likeness"; ResultSet resp = session.execute(query); if (!resp.isSucceeded()) { log.error(String.format("Execute: `%s', failed: %s", query, resp.getErrorMessage())); System.exit(1); } printResult(resp);
配合@Bean(destroyMethod = "close")
, 建立一個工廠類, 接收pool並實現close()方法
public class NebulaSessionFactory { private final NebulaPool pool; private final String username; private final String password; public NebulaSessionFactory(NebulaPool pool, String username, String password) { this.pool = pool; this.username = username; this.password = password; } public Session getSession() { try { return pool.getSession(username, password, false); } catch (NotValidConnectionException|IOErrorException|AuthFailedException|ClientServerIncompatibleException e) { throw new RuntimeException("Nebula session exception", e); } } public void close() { pool.close(); } }
為什麼不直接將 NebulaPool 設定為Bean? 因為 Session 每次建立時需要帶使用者名稱密碼, 將密碼作為config注入到每個Service中肯定是大家都不願意看到的.,
myapp: nebula: hosts: @nebula.hosts@ username: @nebula.username@ password: @nebula.password@ max-conn: @nebula.max-conn@
應用啟動時讀取設定, 建立 NebulaPool, 並範例化 NebulaSessionFactory, destroyMethod = "close"
, 這個表示在專案shutdown時會呼叫Bean的close方法釋放資源.
@Configuration public class NebulaGraphConfig { @Value("${myapp.nebula.hosts}") private String hosts; @Value("${myapp.nebula.max-conn}") private int maxConn; @Value("${myapp.nebula.username}") private String username; @Value("${myapp.nebula.password}") private String password; @Bean(destroyMethod = "close") public NebulaSessionFactory nebulaSessionFactory() { List<HostAddress> hostAddresses = new ArrayList<>(); String[] hostList = hosts.split(",[ ]*"); for (String host : hostList) { String[] hostParts = host.split(":"); if (hostParts.length != 2 || !hostParts[1].matches("\d+")) { throw new RuntimeException("Invalid host name set for Nebula: " + host); } hostAddresses.add(new HostAddress(hostParts[0], Integer.parseInt(hostParts[1]))); } NebulaPoolConfig poolConfig = new NebulaPoolConfig(); poolConfig.setMaxConnSize(maxConn); NebulaPool pool = new NebulaPool(); try { pool.init(hostAddresses, poolConfig); } catch (UnknownHostException e) { throw new RuntimeException("Unknown Nebula hosts"); } return new NebulaSessionFactory(pool, username, password); } }
在 Service 中進行呼叫
@Service @Slf4j public class GraphServiceImpl implements GraphService { @Autowired private NebulaSessionFactory sessionFactory; @Override public <T> NebulaResult<T> query(String graphSpace, String gql) { Session session = null; try { log.info("GQL: {}", gql); session = sessionFactory.getSession(); NebulaResult<Void> res = query(session, "USE " + graphSpace); if (!res.isSuccess() || res.getResults() == null || res.getResults().size() == 0) { log.error("Failed to use space:{}", graphSpace); return null; } if (!graphSpace.equals(res.getResults().get(0).getSpaceName())) { log.error("Failed to use space:{}, result:{}", graphSpace, res.getResults().get(0).getSpaceName()); return null; } return query(session, gql); } catch (IOErrorException e) { log.error(e.getMessage(), e); return null; } finally { if (session != null) { session.release(); } } } private <T> NebulaResult<T> query(Session session, String gql) throws IOErrorException { String json = session.executeJson(gql); return JacksonUtil.extractByType(json, new TypeReference<>() {}); } }
這裡定義了 json 格式響應的外層結構
@Data public class NebulaResult<T> implements Serializable { private List<Error> errors; private List<Result<T>> results; @JsonIgnore public boolean isSuccess() { return (errors != null && errors.size() == 1 && errors.get(0).getCode() == 0); } @Data public static class Error implements Serializable { private int code; } @Data @JsonIgnoreProperties(ignoreUnknown = true) @JsonInclude(JsonInclude.Include.NON_NULL) public static class Result<T> implements Serializable { private String spaceName; private List<Element<T>> data; private List<String> columns; private Error errors; private long latencyInUs; } @Data public static class Element<T> implements Serializable { private List<Meta<T>> meta; private List<Serializable> row; } @Data public static class Meta<T> implements Serializable { private String type; private T id; } }
內層因為區分Edge和Vertex, 結構不一樣. 如果是混合返回的結果, 可以用 Serializable
String gql = "match (v:ADDR)-[e]-() where id(v)=="ADD:123123" return v,e limit 100"; NebulaResult<Serializable> res = graphService.query("insurance", gql); log.info(JacksonUtil.compress(res)); Assertions.assertThat(res).isNotNull();
對於邊, 需要使用結構化的ID
@Data @JsonIgnoreProperties(ignoreUnknown = true) @JsonInclude(JsonInclude.Include.NON_NULL) public class EdgeId implements Serializable { private int ranking; private int type; private String dst; private String src; private String name; }
NebulaResult<EdgeId> res3 = graphService.query("t_test1", "MATCH ()-[e:follow]-() RETURN e");
對於點, ID就是String
NebulaResult<String> res2 = graphService.query("t_test1", "MATCH (v:player) RETURN v");
