Skip to content

Commit 8b669df

Browse files
committedSep 30, 2022
include column lineage in dataset resource
Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>
1 parent 791e1bf commit 8b669df

20 files changed

+338
-13
lines changed
 

‎CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
* Implemented dataset symlink feature which allows providing multiple names for a dataset and adds edges to lineage graph based on symlinks [`#2066`](https://github.com/MarquezProject/marquez/pull/2066) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
77
* Store column lineage facets in separate table [`#2096`](https://github.com/MarquezProject/marquez/pull/2096) [@mzareba382](https://github.com/mzareba382) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
88
* Lineage graph endpoint for column lineage [`#2124`](https://github.com/MarquezProject/marquez/pull/2124) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
9+
* Enrich returned dataset resource with column lineage information [`#2113`](https://github.com/MarquezProject/marquez/pull/2113) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
910

1011
### Fixed
1112
* Add support for `parentRun` facet as reported by older Airflow OpenLineage versions [@collado-mike](https://github.com/collado-mike)

‎api/src/main/java/marquez/api/DatasetResource.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import com.codahale.metrics.annotation.ResponseMetered;
1313
import com.codahale.metrics.annotation.Timed;
1414
import com.fasterxml.jackson.annotation.JsonProperty;
15+
import java.util.Arrays;
1516
import java.util.List;
1617
import java.util.Locale;
1718
import javax.validation.Valid;
@@ -85,10 +86,11 @@ public Response getDataset(
8586
@PathParam("dataset") DatasetName datasetName) {
8687
throwIfNotExists(namespaceName);
8788

88-
final Dataset dataset =
89+
Dataset dataset =
8990
datasetService
9091
.findWithTags(namespaceName.getValue(), datasetName.getValue())
9192
.orElseThrow(() -> new DatasetNotFoundException(datasetName));
93+
columnLineageService.enrichWithColumnLineage(Arrays.asList(dataset));
9294
return Response.ok(dataset).build();
9395
}
9496

@@ -147,6 +149,7 @@ public Response list(
147149

148150
final List<Dataset> datasets =
149151
datasetService.findAllWithTags(namespaceName.getValue(), limit, offset);
152+
columnLineageService.enrichWithColumnLineage(datasets);
150153
final int totalCount = datasetService.countFor(namespaceName.getValue());
151154
return Response.ok(new ResultsPage<>("datasets", datasets, totalCount)).build();
152155
}

‎api/src/main/java/marquez/db/ColumnLineageDao.java

+43
Original file line numberDiff line numberDiff line change
@@ -150,4 +150,47 @@ Set<ColumnLineageNodeData> getLineage(
150150
int depth,
151151
@BindList(onEmpty = NULL_STRING) List<UUID> datasetFieldUuids,
152152
Instant createdAtUntil);
153+
154+
@SqlQuery(
155+
"""
156+
WITH selected_column_lineage AS (
157+
SELECT cl.*
158+
FROM column_lineage cl
159+
JOIN dataset_fields df ON df.uuid = cl.output_dataset_field_uuid
160+
JOIN datasets_view dv ON dv.uuid = df.dataset_uuid
161+
WHERE ARRAY[<values>]::DATASET_NAME[] && dv.dataset_symlinks
162+
),
163+
dataset_fields_view AS (
164+
SELECT d.namespace_name as namespace_name, d.name as dataset_name, df.name as field_name, df.type, df.uuid
165+
FROM dataset_fields df
166+
INNER JOIN datasets_view d ON d.uuid = df.dataset_uuid
167+
)
168+
SELECT
169+
output_fields.namespace_name,
170+
output_fields.dataset_name,
171+
output_fields.field_name,
172+
output_fields.type,
173+
ARRAY_AGG(ARRAY[input_fields.namespace_name, input_fields.dataset_name, input_fields.field_name]) AS inputFields,
174+
c.transformation_description,
175+
c.transformation_type,
176+
c.created_at,
177+
c.updated_at
178+
FROM selected_column_lineage c
179+
INNER JOIN dataset_fields_view output_fields ON c.output_dataset_field_uuid = output_fields.uuid
180+
LEFT JOIN dataset_fields_view input_fields ON c.input_dataset_field_uuid = input_fields.uuid
181+
GROUP BY
182+
output_fields.namespace_name,
183+
output_fields.dataset_name,
184+
output_fields.field_name,
185+
output_fields.type,
186+
c.transformation_description,
187+
c.transformation_type,
188+
c.created_at,
189+
c.updated_at
190+
""")
191+
Set<ColumnLineageNodeData> getLineageRowsForDatasets(
192+
@BindBeanList(
193+
propertyNames = {"left", "right"},
194+
value = "values")
195+
List<Pair<String, String>> datasets);
153196
}

‎api/src/main/java/marquez/db/DatasetFieldDao.java

+15-12
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,14 @@
3434
@RegisterRowMapper(FieldDataMapper.class)
3535
public interface DatasetFieldDao extends BaseDao {
3636
@SqlQuery(
37-
"SELECT EXISTS ("
38-
+ "SELECT 1 FROM dataset_fields AS df "
39-
+ "INNER JOIN datasets_view AS d "
40-
+ " ON d.uuid = df.dataset_uuid AND d.name = :datasetName AND d.namespace_name = :namespaceName "
41-
+ "WHERE df.name = :name)")
37+
"""
38+
SELECT EXISTS (
39+
SELECT 1 FROM dataset_fields AS df
40+
INNER JOIN datasets_view AS d ON d.uuid = df.dataset_uuid
41+
WHERE CAST((:namespaceName, :datasetName) AS DATASET_NAME) = ANY(d.dataset_symlinks)
42+
AND df.name = :name
43+
)
44+
""")
4245
boolean exists(String namespaceName, String datasetName, String name);
4346

4447
default Dataset updateTags(
@@ -97,20 +100,20 @@ default Dataset updateTags(
97100
"""
98101
SELECT df.uuid
99102
FROM dataset_fields df
100-
INNER JOIN datasets_view AS d
101-
ON d.uuid = df.dataset_uuid AND d.name = :datasetName AND d.namespace_name = :namespace
103+
JOIN datasets_view AS d ON d.uuid = df.dataset_uuid
104+
WHERE CAST((:namespaceName, :datasetName) AS DATASET_NAME) = ANY(d.dataset_symlinks)
102105
""")
103-
List<UUID> findDatasetFieldsUuids(String namespace, String datasetName);
106+
List<UUID> findDatasetFieldsUuids(String namespaceName, String datasetName);
104107

105108
@SqlQuery(
106109
"""
107110
SELECT df.uuid
108111
FROM dataset_fields df
109-
INNER JOIN datasets_view AS d
110-
ON d.uuid = df.dataset_uuid AND d.name = :datasetName AND d.namespace_name = :namespace
111-
WHERE df.name = :name
112+
JOIN datasets_view AS d ON d.uuid = df.dataset_uuid
113+
WHERE CAST((:namespaceName, :datasetName) AS DATASET_NAME) = ANY(d.dataset_symlinks)
114+
AND df.name = :name
112115
""")
113-
Optional<UUID> findUuid(String namespace, String datasetName, String name);
116+
Optional<UUID> findUuid(String namespaceName, String datasetName, String name);
114117

115118
@SqlQuery(
116119
"SELECT f.*, "

‎api/src/main/java/marquez/service/ColumnLineageService.java

+51
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,14 @@
2222
import marquez.db.ColumnLineageDao;
2323
import marquez.db.DatasetFieldDao;
2424
import marquez.db.models.ColumnLineageNodeData;
25+
import marquez.service.models.ColumnLineage;
26+
import marquez.service.models.ColumnLineageInputField;
27+
import marquez.service.models.Dataset;
2528
import marquez.service.models.Edge;
2629
import marquez.service.models.Lineage;
2730
import marquez.service.models.Node;
2831
import marquez.service.models.NodeId;
32+
import org.apache.commons.lang3.tuple.Pair;
2933

3034
@Slf4j
3135
public class ColumnLineageService extends DelegatingDaos.DelegatingColumnLineageDao {
@@ -125,4 +129,51 @@ List<UUID> getColumnNodeUuids(NodeId nodeId) {
125129
}
126130
return columnNodeUuids;
127131
}
132+
133+
public void enrichWithColumnLineage(List<Dataset> datasets) {
134+
if (datasets.isEmpty()) {
135+
return;
136+
}
137+
138+
Set<ColumnLineageNodeData> lineageRowsForDatasets =
139+
getLineageRowsForDatasets(
140+
datasets.stream()
141+
.map(d -> Pair.of(d.getNamespace().getValue(), d.getName().getValue()))
142+
.collect(Collectors.toList()));
143+
144+
Map<Dataset, List<ColumnLineage>> datasetLineage = new HashMap<>();
145+
lineageRowsForDatasets.stream()
146+
.forEach(
147+
nodeData -> {
148+
Dataset dataset =
149+
datasets.stream()
150+
.filter(d -> d.getNamespace().getValue().equals(nodeData.getNamespace()))
151+
.filter(d -> d.getName().getValue().equals(nodeData.getDataset()))
152+
.findAny()
153+
.get();
154+
155+
if (!datasetLineage.containsKey(dataset)) {
156+
datasetLineage.put(dataset, new LinkedList<>());
157+
}
158+
datasetLineage
159+
.get(dataset)
160+
.add(
161+
ColumnLineage.builder()
162+
.name(nodeData.getField())
163+
.transformationDescription(nodeData.getTransformationDescription())
164+
.transformationType(nodeData.getTransformationType())
165+
.inputFields(
166+
nodeData.getInputFields().stream()
167+
.map(
168+
f ->
169+
new ColumnLineageInputField(
170+
f.getNamespace(), f.getDataset(), f.getField()))
171+
.collect(Collectors.toList()))
172+
.build());
173+
});
174+
175+
datasets.stream()
176+
.filter(dataset -> datasetLineage.containsKey(dataset))
177+
.forEach(dataset -> dataset.setColumnLineage(datasetLineage.get(dataset)));
178+
}
128179
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Copyright 2018-2022 contributors to the Marquez project
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package marquez.service.models;
7+
8+
import java.util.List;
9+
import javax.validation.constraints.NotNull;
10+
import lombok.Builder;
11+
import lombok.EqualsAndHashCode;
12+
import lombok.Getter;
13+
import lombok.ToString;
14+
15+
@EqualsAndHashCode
16+
@ToString
17+
@Builder
18+
@Getter
19+
public class ColumnLineage {
20+
@NotNull private String name;
21+
@NotNull private List<ColumnLineageInputField> inputFields;
22+
@NotNull private String transformationDescription;
23+
@NotNull private String transformationType;
24+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
* Copyright 2018-2022 contributors to the Marquez project
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package marquez.service.models;
7+
8+
import javax.validation.constraints.NotNull;
9+
import lombok.AllArgsConstructor;
10+
import lombok.EqualsAndHashCode;
11+
import lombok.Getter;
12+
import lombok.ToString;
13+
14+
@EqualsAndHashCode
15+
@ToString
16+
@Getter
17+
@AllArgsConstructor
18+
public class ColumnLineageInputField {
19+
@NotNull private String datasetNamespace;
20+
@NotNull private String datasetName;
21+
@NotNull private String fieldName;
22+
}

‎api/src/main/java/marquez/service/models/Dataset.java

+3
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ public abstract class Dataset {
5353
@Nullable private final String lastLifecycleState;
5454
@Nullable private final String description;
5555
@Nullable private final UUID currentVersion;
56+
@Getter @Setter @Nullable private List<ColumnLineage> columnLineage;
5657
@Getter ImmutableMap<String, Object> facets;
5758
@Getter private final boolean isDeleted;
5859

@@ -70,6 +71,7 @@ public Dataset(
7071
@Nullable final String lastLifecycleState,
7172
@Nullable final String description,
7273
@Nullable final UUID currentVersion,
74+
@Nullable final ImmutableList<ColumnLineage> columnLineage,
7375
@Nullable final ImmutableMap<String, Object> facets,
7476
boolean isDeleted) {
7577
this.id = id;
@@ -86,6 +88,7 @@ public Dataset(
8688
this.lastLifecycleState = lastLifecycleState;
8789
this.description = description;
8890
this.currentVersion = currentVersion;
91+
this.columnLineage = columnLineage;
8992
this.facets = (facets == null) ? ImmutableMap.of() : facets;
9093
this.isDeleted = isDeleted;
9194
}

‎api/src/main/java/marquez/service/models/DbTable.java

+1
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ public DbTable(
5353
lastLifecycleState,
5454
description,
5555
currentVersion,
56+
null,
5657
facets,
5758
isDeleted);
5859
}

‎api/src/main/java/marquez/service/models/Stream.java

+1
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ public Stream(
5959
lastLifecycleState,
6060
description,
6161
currentVersion,
62+
null,
6263
facets,
6364
isDeleted);
6465
this.schemaLocation = schemaLocation;

‎api/src/test/java/marquez/DatasetIntegrationTest.java

+48
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55

66
package marquez;
77

8+
import static marquez.db.ColumnLineageTestUtils.getDatasetA;
9+
import static marquez.db.ColumnLineageTestUtils.getDatasetB;
810
import static org.assertj.core.api.Assertions.assertThat;
911

1012
import com.fasterxml.jackson.core.type.TypeReference;
@@ -22,6 +24,7 @@
2224
import java.util.Optional;
2325
import java.util.UUID;
2426
import java.util.concurrent.CompletableFuture;
27+
import marquez.client.models.ColumnLineage;
2528
import marquez.client.models.Dataset;
2629
import marquez.client.models.DatasetId;
2730
import marquez.client.models.DatasetVersion;
@@ -440,4 +443,49 @@ public void testApp_showsDeletedDatasetAfterReceivingNewVersion() throws IOExcep
440443
datasets = client.listDatasets(namespace);
441444
assertThat(datasets).hasSize(1);
442445
}
446+
447+
@Test
448+
public void testApp_getDatasetContainsColumnLineage() {
449+
LineageEvent event =
450+
new LineageEvent(
451+
"COMPLETE",
452+
Instant.now().atZone(ZoneId.systemDefault()),
453+
new LineageEvent.Run(UUID.randomUUID().toString(), null),
454+
new LineageEvent.Job("namespace", "job_name", null),
455+
List.of(getDatasetA()),
456+
List.of(getDatasetB()),
457+
"the_producer");
458+
459+
CompletableFuture<Integer> resp =
460+
this.sendLineage(Utils.toJson(event))
461+
.thenApply(HttpResponse::statusCode)
462+
.whenComplete(
463+
(val, error) -> {
464+
if (error != null) {
465+
Assertions.fail("Could not complete request");
466+
}
467+
});
468+
resp.join();
469+
470+
// verify listDatasets contains column lineage
471+
List<ColumnLineage> columnLineage;
472+
473+
columnLineage =
474+
client.listDatasets("namespace").stream()
475+
.filter(d -> d.getName().equals("dataset_b"))
476+
.findAny()
477+
.get()
478+
.getColumnLineage();
479+
assertThat(columnLineage).hasSize(1);
480+
assertThat(columnLineage.get(0).getInputFields()).hasSize(2);
481+
482+
// verify getDataset returns non-empty column lineage
483+
columnLineage = client.getDataset("namespace", "dataset_b").getColumnLineage();
484+
assertThat(columnLineage).hasSize(1);
485+
assertThat(columnLineage.get(0).getInputFields()).hasSize(2);
486+
487+
client.deleteJob("namespace", "job_name");
488+
client.deleteDataset("namespace", "dataset_a");
489+
client.deleteDataset("namespace", "dataset_b");
490+
}
443491
}

‎api/src/test/java/marquez/db/ColumnLineageTestUtils.java

+9
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,9 @@ public static LineageEvent.Dataset getDatasetA() {
5555
Arrays.asList(
5656
new LineageEvent.SchemaField("col_a", "STRING", ""),
5757
new LineageEvent.SchemaField("col_b", "STRING", ""))))
58+
.dataSource(
59+
new LineageEvent.DatasourceDatasetFacet(
60+
PRODUCER_URL, SCHEMA_URL, "the source", "http://thesource.com"))
5861
.build());
5962
}
6063

@@ -69,6 +72,9 @@ public static LineageEvent.Dataset getDatasetB() {
6972
PRODUCER_URL,
7073
SCHEMA_URL,
7174
Arrays.asList(new LineageEvent.SchemaField("col_c", "STRING", ""))))
75+
.dataSource(
76+
new LineageEvent.DatasourceDatasetFacet(
77+
PRODUCER_URL, SCHEMA_URL, "the source", "http://thesource.com"))
7278
.columnLineage(
7379
new LineageEvent.ColumnLineageFacet(
7480
PRODUCER_URL,
@@ -109,6 +115,9 @@ public static LineageEvent.Dataset getDatasetC() {
109115
"namespace", "dataset_b", "col_c")),
110116
"description2",
111117
"type2"))))
118+
.dataSource(
119+
new LineageEvent.DatasourceDatasetFacet(
120+
PRODUCER_URL, SCHEMA_URL, "the source", "http://thesource.com"))
112121
.build());
113122
}
114123
}

0 commit comments

Comments
 (0)