Data Governance
Also available as:
PDF
loading table of contents...

Atlas Lineage API

In the previous section we registered a spark_dataframe type. In this section we will use a spark_dataframe type in order to describe the Atlas Lineage API.

We will start by registering a spark_transformation type. As the name suggests, a spark_transformation symbolizes a transformation of a spark_dataframe. The spark_transformation type extends the Process type.

POST /v2/types/entitydef  

{
  "category" : "ENTITY",
  "name" : "spark_transformation",
  "superTypes" : [
    "Process"
  ],
  "typeVersion" : "1.0",
  "attributeDefs" : [
    {
      "name" : "parallelism",
      "typeName" : "int",
      "cardinality" : "SINGLE",
      "isIndexable" : false,
      "isOptional" : true,
      "isUnique" : false
    }
  ]
}

Next we will create two spark_dataframe entities:

POST /v2/entity/  

{
  "typeName": "spark_dataframe",
  "attributes" : {
    "qualifiedName" : "source_dataframe@clusterName",
    "name" : "source_dataframe",
    "source" : "hdfs://vimal-fenton-4-1.openstacklocal:8020/apps/hive/warehouse/source",
    "destination" : "hdfs://vimal-fenton-4-1.openstacklocal:8020/apps/hive/warehouse/destination"
  },
  "classifications": [

  ]
}

POST /v2/entity/  

{
  "typeName": "spark_dataframe",
  "attributes" : {
    "qualifiedName" : "destination_dataframe@clusterName",
    "name" : "destination_dataframe",
    "source" : "hdfs://vimal-fenton-4-1.openstacklocal:8020/apps/hive/warehouse/source_2",
    "destination" : "hdfs://vimal-fenton-4-1.openstacklocal:8020/apps/hive/warehouse/destination_2"
  },
  "classifications": [

  ]
}

Next we will register a spark_transformation entity, which will establish a relationship between the two spark_dataframe entities:

POST /v2/entity/  

Request Body:

{
  "typeName": "spark_transformation",
  "attributes" : {
    "qualifiedName" : "spark_process_id_24343324",
    "name" : "spark_process",
  "parallelism" : "10",
  "inputs" : [
      {
        "typeName": "spark_dataframe",
        "attributes": {
          "source": "hdfs://vimal-fenton-4-1.openstacklocal:8020/apps/hive/warehouse/source",
          "description": null,
          "qualifiedName": "source_dataframe@clusterName",
          "name": "source_dataframe",
          "owner": null,
          "destination": "hdfs://vimal-fenton-4-1.openstacklocal:8020/apps/hive/warehouse/destination"
        },
        "guid": "c94d8450-6d59-4cd1-8732-863286387c7d"
      }
    ],
    "outputs" : [
    {
      "typeName": "spark_dataframe",
      "attributes": {
        "source": "hdfs://vimal-fenton-4-1.openstacklocal:8020/apps/hive/warehouse/source_2",
        "description": null,
        "qualifiedName": "destination_dataframe@clusterName",
        "name": "destination_dataframe",
        "owner": null,
        "destination": "hdfs://vimal-fenton-4-1.openstacklocal:8020/apps/hive/warehouse/destination_2"
      },
      "guid": "86d5cb10-538a-40cd-80c9-22fc363224d0"
    }
  ]
  },
  "classifications": [

  ]
}

Response:

{
  "entitiesMutated": {
    "CREATE_OR_UPDATE": [
      {
        "guid": "58a3ee5d-827f-4aa4-98e1-ccebd5851c76"
      }
    ]
  }
}

Now we can use the Lineage API to retrieve the lineage information of the spark_dataframe:

Method Signature:

@GET
@Path("/{guid}")
@Consumes(Servlets.JSON_MEDIA_TYPE)
@Produces(Servlets.JSON_MEDIA_TYPE)
public AtlasLineageInfo getLineageGraph(@PathParam("guid") String guid,
@QueryParam("direction") @DefaultValue(DEFAULT_DIRECTION)  LineageDirection direction,
@QueryParam("depth") @DefaultValue(DEFAULT_DEPTH) int depth) throws AtlasBaseException {

Example Request:

GET v2/lineage/c94d8450-6d59-4cd1-8732-863286387c7d  

Example Response:

{
  "baseEntityGuid": "c94d8450-6d59-4cd1-8732-863286387c7d",
  "lineageDirection": "BOTH",
  "lineageDepth": 3,
  "guidEntityMap": {
    "58a3ee5d-827f-4aa4-98e1-ccebd5851c76": {
      "typeName": "spark_transformation",
      "guid": "58a3ee5d-827f-4aa4-98e1-ccebd5851c76",
      "status": "STATUS_ACTIVE",
      "displayText": "spark_process_id_24343324"
    },
    "c94d8450-6d59-4cd1-8732-863286387c7d": {
      "typeName": "spark_dataframe",
      "guid": "c94d8450-6d59-4cd1-8732-863286387c7d",
      "status": "STATUS_ACTIVE",
      "displayText": "source_dataframe@clusterName"
    },
    "86d5cb10-538a-40cd-80c9-22fc363224d0": {
      "typeName": "spark_dataframe",
      "guid": "86d5cb10-538a-40cd-80c9-22fc363224d0",
      "status": "STATUS_ACTIVE",
      "displayText": "destination_dataframe@clusterName"
    }
  },
  "relations": [
    {
      "fromEntityId": "c94d8450-6d59-4cd1-8732-863286387c7d",
      "toEntityId": "58a3ee5d-827f-4aa4-98e1-ccebd5851c76"
    },
    {
      "fromEntityId": "58a3ee5d-827f-4aa4-98e1-ccebd5851c76",
      "toEntityId": "86d5cb10-538a-40cd-80c9-22fc363224d0"
    }
  ]
}

We can specify the direction of the lineage information in the API call. There are three possible values for direction: INPUT, OUTPUT, and BOTH. The direction is BOTH by default. If we specify direction as INPUT in the API call, the result set contains only the input entities from which the given entity has been derived. Similarly, if we specify the direction as OUTPUT, the result set contains all of the entities derived from the given entity.

We can also specify the depth of the lineage results. If we specify depth, Atlas fetches all of the entities that lie within the given depth in the entity lineage diagram .

To demonstrate the query parameters in the Lineage API call, we will register another entity of type hdfs_path:

Request:

POST /v2/entity

Request Body:

{
	"typeName" : "hdfs_path",
	"attributes" : {
		"path" : "hdfs://vimal-fenton-4-1.openstacklocal:8020/apps/hive/warehouse/result",
		"qualifiedName" : "result_directory@clusterName",
		"name" : "spark_transformaion_result"
	}
}

Response:

          {
            "typeName": "hdfs_path",
            "attributes": {
              "clusterName": null,
              "createTime": null,
              "qualifiedName": "result_directory@clusterName",
              "modifiedTime": null,
              "posixPermissions": null,
              "fileSize": 0,
              "numberOfReplicas": 0,
              "description": null,
              "isFile": false,
              "name": "spark_transformaion_result",
              "owner": null,
              "path": "hdfs://vimal-fenton-4-1.openstacklocal:8020/apps/hive/warehouse/result",
              "group": null,
              "extendedAttributes": null,
              "isSymlink": false
            },
            "guid": "17a657d3-e72a-4501-8bde-a2843bed84ac"
          }

Next we will register another spark_transformation entity:

Request:

POST /v2/entity

Request Body:

{
  "typeName": "spark_transformation",
  "attributes" : {
    "qualifiedName" : "spark_process_id_32454545",
    "name" : "spark_process_2",
  "parallelism" : "10",
  "inputs" : [
      {
        "typeName": "spark_dataframe",
        "attributes": {
          "source": "hdfs://vimal-fenton-4-1.openstacklocal:8020/apps/hive/warehouse/source_2",
          "description": null,
          "qualifiedName": "destination_dataframe@clusterName",
          "name": "destination_dataframe",
          "owner": null,
          "destination": "hdfs://vimal-fenton-4-1.openstacklocal:8020/apps/hive/warehouse/destination_2"
        },
        "guid": "86d5cb10-538a-40cd-80c9-22fc363224d0"
      }
    ],
    "outputs" : [
          {
            "typeName": "hdfs_path",
            "attributes": {
              "clusterName": null,
              "createTime": null,
              "qualifiedName": "result_directory@clusterName",
              "modifiedTime": null,
              "posixPermissions": null,
              "fileSize": 0,
              "numberOfReplicas": 0,
              "description": null,
              "isFile": false,
              "name": "spark_transformaion_result",
              "owner": null,
              "path": "hdfs://vimal-fenton-4-1.openstacklocal:8020/apps/hive/warehouse/result",
              "group": null,
              "extendedAttributes": null,
              "isSymlink": false
            },
            "guid": "17a657d3-e72a-4501-8bde-a2843bed84ac"
          }
  ]
  },
  "classifications": [

  ]
}

Response:

{
  "typeName": "spark_transformation",
  "attributes": {
    "inputs": [
      {
        "typeName": "DataSet",
        "guid": "86d5cb10-538a-40cd-80c9-22fc363224d0"
      }
    ],
    "description": null,
    "qualifiedName": "spark_process_id_32454545",
    "name": "spark_process_2",
    "owner": null,
    "outputs": [
      {
        "typeName": "DataSet",
        "guid": "17a657d3-e72a-4501-8bde-a2843bed84ac"
      }
    ],
    "parallelism": 10
  },
  "guid": "7cedee1c-c75a-4be8-b383-6079013ee095"
}

Now that we have the following lineage relationship:

source_dataframe@clusterName ⇒ spark_process_id_24343324 ⇒ destination_dataframe@clusterName ⇒ spark_process_id_32454545 ⇒ result_directory@clusterName

We can experiment with the query parameters in the lineage API. The GUID corresponding to destination_dataframe@clusterName is 86d5cb10-538a-40cd-80c9-22fc363224d0.

Example Request:

GET v2/lineage/86d5cb10-538a-40cd-80c9-22fc363224d0  

Example Response:

{
  "baseEntityGuid": "86d5cb10-538a-40cd-80c9-22fc363224d0",
  "lineageDirection": "BOTH",
  "lineageDepth": 3,
  "guidEntityMap": {
    "7cedee1c-c75a-4be8-b383-6079013ee095": {
      "typeName": "spark_transformation",
      "guid": "7cedee1c-c75a-4be8-b383-6079013ee095",
      "status": "STATUS_ACTIVE",
      "displayText": "spark_process_id_32454545"
    },
    "58a3ee5d-827f-4aa4-98e1-ccebd5851c76": {
      "typeName": "spark_transformation",
      "guid": "58a3ee5d-827f-4aa4-98e1-ccebd5851c76",
      "status": "STATUS_ACTIVE",
      "displayText": "spark_process_id_24343324"
    },
    "c94d8450-6d59-4cd1-8732-863286387c7d": {
      "typeName": "spark_dataframe",
      "guid": "c94d8450-6d59-4cd1-8732-863286387c7d",
      "status": "STATUS_ACTIVE",
      "displayText": "source_dataframe@clusterName"
    },
    "17a657d3-e72a-4501-8bde-a2843bed84ac": {
      "typeName": "hdfs_path",
      "guid": "17a657d3-e72a-4501-8bde-a2843bed84ac",
      "status": "STATUS_ACTIVE",
      "displayText": "result_directory@clusterName"
    },
    "86d5cb10-538a-40cd-80c9-22fc363224d0": {
      "typeName": "spark_dataframe",
      "guid": "86d5cb10-538a-40cd-80c9-22fc363224d0",
      "status": "STATUS_ACTIVE",
      "displayText": "destination_dataframe@clusterName"
    }
  },
  "relations": [
    {
      "fromEntityId": "c94d8450-6d59-4cd1-8732-863286387c7d",
      "toEntityId": "58a3ee5d-827f-4aa4-98e1-ccebd5851c76"
    },
    {
      "fromEntityId": "86d5cb10-538a-40cd-80c9-22fc363224d0",
      "toEntityId": "7cedee1c-c75a-4be8-b383-6079013ee095"
    },
    {
      "fromEntityId": "58a3ee5d-827f-4aa4-98e1-ccebd5851c76",
      "toEntityId": "86d5cb10-538a-40cd-80c9-22fc363224d0"
    },
    {
      "fromEntityId": "7cedee1c-c75a-4be8-b383-6079013ee095",
      "toEntityId": "17a657d3-e72a-4501-8bde-a2843bed84ac"
    }
  ]
}

To find the INPUT entities from which destination_dataframe@clusterName is derived, we can specify the query parameter direction in the request:

Example Request:

GET v2/lineage/86d5cb10-538a-40cd-80c9-22fc363224d0?direction=INPUT  

Example Response:

{
  "baseEntityGuid": "86d5cb10-538a-40cd-80c9-22fc363224d0",
  "lineageDirection": "INPUT",
  "lineageDepth": 3,
  "guidEntityMap": {
    "58a3ee5d-827f-4aa4-98e1-ccebd5851c76": {
      "typeName": "spark_transformation",
      "guid": "58a3ee5d-827f-4aa4-98e1-ccebd5851c76",
      "status": "STATUS_ACTIVE",
      "displayText": "spark_process_id_24343324"
    },
    "c94d8450-6d59-4cd1-8732-863286387c7d": {
      "typeName": "spark_dataframe",
      "guid": "c94d8450-6d59-4cd1-8732-863286387c7d",
      "status": "STATUS_ACTIVE",
      "displayText": "source_dataframe@clusterName"
    },
    "86d5cb10-538a-40cd-80c9-22fc363224d0": {
      "typeName": "spark_dataframe",
      "guid": "86d5cb10-538a-40cd-80c9-22fc363224d0",
      "status": "STATUS_ACTIVE",
      "displayText": "destination_dataframe@clusterName"
    }
  },
  "relations": [
    {
      "fromEntityId": "c94d8450-6d59-4cd1-8732-863286387c7d",
      "toEntityId": "58a3ee5d-827f-4aa4-98e1-ccebd5851c76"
    },
    {
      "fromEntityId": "58a3ee5d-827f-4aa4-98e1-ccebd5851c76",
      "toEntityId": "86d5cb10-538a-40cd-80c9-22fc363224d0"
    }
  ]
}

Similarly, we can specify the parameter depth to fetch entities within a depth limit.