数据治理的王者Apache Atlas如何构建自己的API

1,405 阅读3分钟

Apache Atlas是一个优秀的服务治理组件,用于企业Hadoop集群上的数据治理和元数据管理的数据治理工具。接下来我们将讨论构建自己的Java API,这些Java API可使用Apache atlas客户端与Apache Atlas交互以在其中创建新的实体和类型。

一、Atlas客户端Maven依赖关系

以下依赖项可用于pom.xml文件

  <dependency>
      <groupId>org.apache.atlas</groupId>
      <artifactId>atlas-client</artifactId>
      <version>0.7-incubating</version>
  </dependency>
  <dependency>
      <groupId>org.apache.atlas</groupId>
      <artifactId>atlas-typesystem</artifactId>
      <version>0.7-incubating</version>
  </dependency>
  <dependency>
      <groupId>org.apache.atlas</groupId>
      <artifactId>atlas-notification</artifactId>
      <version>0.7-incubating</version>
  </dependency>
  <dependency>
      <groupId>org.apache.atlas</groupId>
      <artifactId>atlas-repository</artifactId>
      <version>0.7-incubating</version>
  </dependency>

二、设置atlas-application.properties

Apache Atlas客户端使用atlas-application属性在我们的API和Apache Atlas服务器之间建立连接。这些属性应放置在resources/atlas-application.properties中

#########  Security Properties  #########

# SSL config
atlas.enableTLS=false

#########  Server Properties  #########
atlas.rest.address=http://192.168.5.95:21000

atlas.hook.demo.kafka.retries=1
atlas.kafka.zookeeper.connect=192.168.5.93:2181,192.168.5.94:2181,192.168.5.95:2181
atlas.kafka.bootstrap.servers=192.168.5.93:9092,192.168.5.94:9092,192.168.5.95:9092
atlas.kafka.zookeeper.session.timeout.ms=4000
atlas.kafka.zookeeper.connection.timeout.ms=2000
atlas.kafka.zookeeper.sync.time.ms=20
atlas.kafka.auto.commit.interval.ms=1000
atlas.kafka.hook.group.id=atlas

三、创建与Atlas服务器的连接

要与Apache atlas Server,baseUrl和用户名创建连接,必须在AtlasClient构造函数中传递密码

final AtlasClient atlasClient = new AtlasClient
            (new String[]{"http://192.168.5.95:21000"},
                    new String[]{"admin",
                            "admin"});

四、关于Type相关的测试类

public class AtlasTypesTest {

    final AtlasClient atlasClient = new AtlasClient
            (new String[]{"http://192.168.5.95:21000"},
                    new String[]{"admin",
                            "admin"});

    static final String DATABASE_TYPE = "DB_Sync";
    static final String COLUMN_TYPE = "Column_Sync";
    static final String TABLE_TYPE = "Table_Sync";
    static final String VIEW_TYPE = "View_Sync";
    public static final String DB_ATTRIBUTE = "db";
    static final String STORAGE_DESC_TYPE = "StorageDesc";
    public static final String COLUMNS_ATTRIBUTE = "columns";
    public static final String INPUT_TABLES_ATTRIBUTE = "inputTables";
    private static final String[] TYPES =
            {DATABASE_TYPE, TABLE_TYPE, STORAGE_DESC_TYPE, COLUMN_TYPE, VIEW_TYPE, "JdbcAccess",
                    "ETL", "Metric", "PII", "Fact", "Dimension", "Log Data"};

    /**
     * 组织定义types
     * @return
     */
    TypesDef createTypeDefinitions() {
        HierarchicalTypeDefinition<ClassType> dbClsDef = TypesUtil
                .createClassTypeDef(DATABASE_TYPE, DATABASE_TYPE, null,
                        TypesUtil.createUniqueRequiredAttrDef("name", DataTypes.STRING_TYPE),
                        attrDef("description", DataTypes.STRING_TYPE.getName()), attrDef("locationUri", DataTypes.STRING_TYPE.getName()),
                        attrDef("owner", DataTypes.STRING_TYPE.getName()), attrDef("createTime", DataTypes.LONG_TYPE.getName()));

        HierarchicalTypeDefinition<ClassType> columnClsDef = TypesUtil
                .createClassTypeDef(COLUMN_TYPE, COLUMN_TYPE, null, attrDef("name", DataTypes.STRING_TYPE.getName()),
                        attrDef("dataType", DataTypes.STRING_TYPE.getName()), attrDef("comment", DataTypes.STRING_TYPE.getName()));

        HierarchicalTypeDefinition<ClassType> tblClsDef = TypesUtil
                .createClassTypeDef(TABLE_TYPE, TABLE_TYPE, ImmutableSet.of("DataSet"),
                        new AttributeDefinition(DB_ATTRIBUTE, DATABASE_TYPE, Multiplicity.REQUIRED, false, null),
                        new AttributeDefinition("sd", STORAGE_DESC_TYPE, Multiplicity.REQUIRED, true, null),
                        attrDef("owner", DataTypes.STRING_TYPE.getName()), attrDef("createTime", DataTypes.LONG_TYPE.getName()),
                        attrDef("lastAccessTime", DataTypes.LONG_TYPE.getName()), attrDef("retention", DataTypes.LONG_TYPE.getName()),
                        attrDef("viewOriginalText", DataTypes.STRING_TYPE.getName()),
                        attrDef("viewExpandedText", DataTypes.STRING_TYPE.getName()), attrDef("tableType", DataTypes.STRING_TYPE.getName()),
                        attrDef("temporary", DataTypes.BOOLEAN_TYPE.getName()),
                        new AttributeDefinition(COLUMNS_ATTRIBUTE, DataTypes.arrayTypeName(COLUMN_TYPE),
                                Multiplicity.COLLECTION, true, null));

        HierarchicalTypeDefinition<ClassType> viewClsDef = TypesUtil
                .createClassTypeDef(VIEW_TYPE, VIEW_TYPE, ImmutableSet.of("DataSet"),
                        new AttributeDefinition("db", DATABASE_TYPE, Multiplicity.REQUIRED, false, null),
                        new AttributeDefinition("inputTables", DataTypes.arrayTypeName(TABLE_TYPE),
                                Multiplicity.COLLECTION, false, null));

        return TypesUtil.getTypesDef(ImmutableList.<EnumTypeDefinition>of(), ImmutableList.<StructTypeDefinition>of(),
                ImmutableList.of(),
                ImmutableList.of(dbClsDef, columnClsDef, tblClsDef, viewClsDef));
    }

    private void createTypes() throws Exception {
        TypesDef typesDef = createTypeDefinitions();
        String typesAsJSON =  TypesSerialization.toJson(typesDef);
        System.out.println("typesAsJSON = " + typesAsJSON);
        atlasClient.createType(typesAsJSON);
        verifyTypesCreated();
    }

    private void verifyTypesCreated() throws Exception {
        List<String> types = atlasClient.listTypes();
        for (String type : TYPES) {
            assert types.contains(type);
        }
    }

    AttributeDefinition attrDef(String name, String dT) {
        return attrDef(name, dT, Multiplicity.OPTIONAL, false, null);
    }

    AttributeDefinition attrDef(String name, String dT, Multiplicity m, boolean isComposite,
                                String reverseAttributeName) {
        return new AttributeDefinition(name, dT, m, isComposite, reverseAttributeName);
    }

    @Test
    public void createNewTypes() throws Exception {
        createTypes();
    }
}

五、关于Entities相关的测试类

public class AtlasEntitiesTest {


    final AtlasClient atlasClient = new AtlasClient
            (new String[]{"http://192.168.5.95:21000"},
                    new String[]{"admin",
                            "admin"});

    /**
     * 创建实例并返创建的Id对象
     * @param referenceable
     * @return
     * @throws Exception
     */
    private Id createInstance(Referenceable referenceable) throws Exception {
        String typeName = referenceable.getTypeName();
        String entityJSON = InstanceSerialization.toJson(referenceable, true);
        System.out.println("Submitting new entity= " + entityJSON);
        List<String> guids = atlasClient.createEntity(entityJSON);
        System.out.println("created instance for type " + typeName + ", guid: " + guids);
        return new Id(guids.get(guids.size() - 1), referenceable.getId().getVersion(),
                referenceable.getTypeName());
    }

    /**
     * 创建数据库实例并返创建的数据库Id对象
     * @param name
     * @param description
     * @param owner
     * @param locationUri
     * @param traitNames
     * @return
     * @throws Exception
     */
    Id database(String name, String description, String owner, String locationUri, String... traitNames)
            throws Exception {
        Referenceable referenceable = new Referenceable(DATABASE_TYPE, traitNames);
        referenceable.set("name", name);
        referenceable.set("description", description);
        referenceable.set("owner", owner);
        referenceable.set("locationUri", locationUri);
        referenceable.set("createTime", System.currentTimeMillis());

        return createInstance(referenceable);
    }

    /**
     * 创建列的实例并返创建的列的实例对象
     * @param name
     * @param dataType
     * @param comment
     * @param traitNames
     * @return
     * @throws Exception
     */
    Referenceable column(String name, String dataType, String comment, String... traitNames) throws Exception {
        Referenceable referenceable = new Referenceable(COLUMN_TYPE, traitNames);
        referenceable.set("name", name);
        referenceable.set("dataType", dataType);
        referenceable.set("comment", comment);

        return referenceable;
    }

    /**
     * 创建表的实例并返创建的表的Id对象
     * @param name
     * @param description
     * @param dbId
     * @param sd
     * @param owner
     * @param tableType
     * @param columns
     * @param traitNames
     * @return
     * @throws Exception
     */
    Id table(String name, String description, Id dbId, Referenceable sd, String owner, String tableType,
             List<Referenceable> columns, String... traitNames) throws Exception {
        Referenceable referenceable = new Referenceable(TABLE_TYPE, traitNames);
        referenceable.set("name", name);
        referenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, name);
        referenceable.set("description", description);
        referenceable.set("owner", owner);
        referenceable.set("tableType", tableType);
        referenceable.set("createTime", System.currentTimeMillis());
        referenceable.set("lastAccessTime", System.currentTimeMillis());
        referenceable.set("retention", System.currentTimeMillis());
        referenceable.set("db", dbId);
        referenceable.set("sd", sd);
        referenceable.set("columns", columns);

        return createInstance(referenceable);
    }

    /**
     * 创建视图的实例并返创建的视图的Id对象
     * @param name
     * @param dbId
     * @param inputTables
     * @param traitNames
     * @return
     * @throws Exception
     */
    Id view(String name, Id dbId, List<Id> inputTables, String... traitNames) throws Exception {
        Referenceable referenceable = new Referenceable(VIEW_TYPE, traitNames);
        referenceable.set("name", name);
        referenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, name);
        referenceable.set("db", dbId);

        referenceable.set(INPUT_TABLES_ATTRIBUTE, inputTables);

        return createInstance(referenceable);
    }

    /**
     * 原始存储描述符
     * @param location
     * @param inputFormat
     * @param outputFormat
     * @param compressed
     * @return
     * @throws Exception
     */
    Referenceable storageDescriptor(String location, String inputFormat, String outputFormat, boolean compressed)
            throws Exception {
        Referenceable referenceable = new Referenceable(STORAGE_DESC_TYPE);
        referenceable.set("location", location);
        referenceable.set("inputFormat", inputFormat);
        referenceable.set("outputFormat", outputFormat);
        referenceable.set("compressed", compressed);

        return referenceable;
    }


    @Test
    public void createEntities() throws Exception {
        //创建数据库实例
        Id syncDB = database("sy_sync", "Sync Database", "root", "");
        //存储描述符
        Referenceable sd =
                storageDescriptor("", "TextInputFormat", "TextOutputFormat",
                        true);
        //创建列实例
        //1、数据源
        List<Referenceable> databaseColumns = ImmutableList
                .of(column("id", "long", "id"),
                        column("name", "string", "name"),
                        column("type", "string", "type"),
                        column("url", "string", "url"),
                        column("database_name", "string", "database name"),
                        column("username", "string", "username"),
                        column("password","string","password"),
                        column("description", "string", "description"),
                        column("create_time", "string", "create time"),
                        column("update_time", "string", "update time"),
                        column("create_id", "long", "user id"),
                        column("update_id", "long", "user id"));
        //2、同步文件夹
        List<Referenceable> syncFolderColumns = ImmutableList
                .of(column("id", "long", "id"),
                        column("name", "string", "name"),
                        column("description", "string", "description"),
                        column("create_time", "string", "create time"),
                        column("update_time", "string", "update time"),
                        column("create_id", "long", "user id"),
                        column("update_id", "long", "user id"));
        //创建表实例
        Id database = table("datasource", "database table", syncDB, sd, "root", "External", databaseColumns);
        Id syncFolder = table("folder", "sync folder table", syncDB, sd, "root", "External", syncFolderColumns);
        //创建视图实例

    }


    @Test
    public void getEntity() throws AtlasServiceException {
        Referenceable referenceable = atlasClient.getEntity("1406ddd0-5d51-41d4-b174-859bd4f34a5b");
        System.out.println(InstanceSerialization.toJson(referenceable, true));
    }

}