Skip to content

Commit dd4aadf

Browse files
decode runless events
Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>
1 parent 87c4f75 commit dd4aadf

19 files changed

+653
-117
lines changed

CHANGELOG.md

+6
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,12 @@
22

33
## [Unreleased](https://github.com/MarquezProject/marquez/compare/0.34.0...HEAD)
44

5+
### Added
6+
7+
* Ability to decode static metadata events [`#2495`](https://github.com/MarquezProject/marquez/pull/2495) [@pawel-big-lebowski]( https://github.com/pawel-big-lebowski)
8+
*Adds the ability to distinguish on a bakend static metadata events introduced based on the [proposal](https://github.com/OpenLineage/OpenLineage/blob/main/proposals/1837/static_lineage.md).*
9+
10+
511
## [0.34.0](https://github.com/MarquezProject/marquez/compare/0.33.0...0.34.0) - 2023-05-18
612

713
### Fixed

api/src/main/java/marquez/api/OpenLineageResource.java

+20-13
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import marquez.api.models.SortDirection;
3939
import marquez.db.OpenLineageDao;
4040
import marquez.service.ServiceFactory;
41+
import marquez.service.models.BaseEvent;
4142
import marquez.service.models.LineageEvent;
4243
import marquez.service.models.NodeId;
4344

@@ -61,20 +62,26 @@ public OpenLineageResource(
6162
@Consumes(APPLICATION_JSON)
6263
@Produces(APPLICATION_JSON)
6364
@Path("/lineage")
64-
public void create(
65-
@Valid @NotNull LineageEvent event, @Suspended final AsyncResponse asyncResponse)
65+
public void create(@Valid @NotNull BaseEvent event, @Suspended final AsyncResponse asyncResponse)
6666
throws JsonProcessingException, SQLException {
67-
openLineageService
68-
.createAsync(event)
69-
.whenComplete(
70-
(result, err) -> {
71-
if (err != null) {
72-
log.error("Unexpected error while processing request", err);
73-
asyncResponse.resume(Response.status(determineStatusCode(err)).build());
74-
} else {
75-
asyncResponse.resume(Response.status(201).build());
76-
}
77-
});
67+
if (event instanceof LineageEvent) {
68+
openLineageService
69+
.createAsync((LineageEvent) event)
70+
.whenComplete(
71+
(result, err) -> {
72+
if (err != null) {
73+
log.error("Unexpected error while processing request", err);
74+
asyncResponse.resume(Response.status(determineStatusCode(err)).build());
75+
} else {
76+
asyncResponse.resume(Response.status(201).build());
77+
}
78+
});
79+
} else {
80+
log.warn("Unsupported event type {}. Skipping without error", event.getClass().getName());
81+
82+
// return serialized event
83+
asyncResponse.resume(Response.status(200).entity(event).build());
84+
}
7885
}
7986

8087
private int determineStatusCode(Throwable e) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/*
2+
* Copyright 2018-2023 contributors to the Marquez project
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package marquez.service.models;
7+
8+
import com.fasterxml.jackson.annotation.JsonTypeInfo;
9+
import com.fasterxml.jackson.annotation.JsonTypeInfo.As;
10+
import com.fasterxml.jackson.annotation.JsonTypeInfo.Id;
11+
import com.fasterxml.jackson.databind.annotation.JsonTypeIdResolver;
12+
13+
@JsonTypeIdResolver(EventTypeResolver.class)
14+
@JsonTypeInfo(
15+
use = Id.CUSTOM,
16+
include = As.EXISTING_PROPERTY,
17+
property = "schemaURL",
18+
defaultImpl = LineageEvent.class,
19+
visible = true)
20+
public class BaseEvent extends BaseJsonModel {}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Copyright 2018-2023 contributors to the Marquez project
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package marquez.service.models;
7+
8+
import java.net.URI;
9+
import java.time.ZonedDateTime;
10+
import javax.validation.Valid;
11+
import javax.validation.constraints.NotNull;
12+
import lombok.AllArgsConstructor;
13+
import lombok.Builder;
14+
import lombok.Getter;
15+
import lombok.NoArgsConstructor;
16+
import lombok.Setter;
17+
import lombok.ToString;
18+
19+
@Builder
20+
@AllArgsConstructor
21+
@NoArgsConstructor
22+
@Setter
23+
@Getter
24+
@Valid
25+
@ToString
26+
public class DatasetEvent extends BaseEvent {
27+
@NotNull private ZonedDateTime eventTime;
28+
@Valid private LineageEvent.Dataset dataset;
29+
@Valid @NotNull private String producer;
30+
@Valid @NotNull private URI schemaURL;
31+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Copyright 2018-2023 contributors to the Marquez project
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package marquez.service.models;
7+
8+
import static marquez.service.models.EventTypeResolver.EventSchemaURL.LINEAGE_EVENT;
9+
10+
import com.fasterxml.jackson.annotation.JsonTypeInfo.Id;
11+
import com.fasterxml.jackson.databind.DatabindContext;
12+
import com.fasterxml.jackson.databind.JavaType;
13+
import com.fasterxml.jackson.databind.jsontype.impl.TypeIdResolverBase;
14+
import java.io.IOException;
15+
import java.util.Arrays;
16+
import lombok.AllArgsConstructor;
17+
import lombok.Getter;
18+
import lombok.extern.slf4j.Slf4j;
19+
20+
@Slf4j
21+
public class EventTypeResolver extends TypeIdResolverBase {
22+
23+
@AllArgsConstructor
24+
public enum EventSchemaURL {
25+
LINEAGE_EVENT(
26+
"https://openlineage.io/spec/2-0-0/OpenLineage.json#/definitions/RunEvent",
27+
LineageEvent.class),
28+
DATASET_EVENT(
29+
"https://openlineage.io/spec/2-0-0/OpenLineage.json#/definitions/DatasetEvent",
30+
DatasetEvent.class),
31+
JOB_EVENT(
32+
"https://openlineage.io/spec/2-0-0/OpenLineage.json#/definitions/JobEvent", JobEvent.class);
33+
34+
@Getter private String schemaURL;
35+
36+
public String getName() {
37+
int lastSlash = schemaURL.lastIndexOf('/');
38+
return schemaURL.substring(lastSlash, schemaURL.length());
39+
}
40+
41+
@Getter private Class<?> subType;
42+
}
43+
44+
private JavaType superType;
45+
46+
@Override
47+
public void init(JavaType baseType) {
48+
superType = baseType;
49+
}
50+
51+
@Override
52+
public String idFromValue(Object value) {
53+
return null;
54+
}
55+
56+
@Override
57+
public String idFromValueAndType(Object value, Class<?> suggestedType) {
58+
return null;
59+
}
60+
61+
@Override
62+
public JavaType typeFromId(DatabindContext context, String id) throws IOException {
63+
if (id == null) {
64+
return context.constructSpecializedType(superType, LINEAGE_EVENT.subType);
65+
}
66+
67+
int lastSlash = id.lastIndexOf('/');
68+
69+
if (lastSlash < 0) {
70+
return context.constructSpecializedType(superType, LINEAGE_EVENT.subType);
71+
}
72+
73+
String type = id.substring(lastSlash, id.length());
74+
75+
Class<?> subType =
76+
Arrays.stream(EventSchemaURL.values())
77+
.filter(s -> s.getName().equals(type))
78+
.findAny()
79+
.map(EventSchemaURL::getSubType)
80+
.orElse(LINEAGE_EVENT.subType);
81+
82+
return context.constructSpecializedType(superType, subType);
83+
}
84+
85+
@Override
86+
public Id getMechanism() {
87+
return null;
88+
}
89+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright 2018-2023 contributors to the Marquez project
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package marquez.service.models;
7+
8+
import java.net.URI;
9+
import java.time.ZonedDateTime;
10+
import java.util.List;
11+
import javax.validation.Valid;
12+
import javax.validation.constraints.NotNull;
13+
import lombok.AllArgsConstructor;
14+
import lombok.Builder;
15+
import lombok.Getter;
16+
import lombok.NoArgsConstructor;
17+
import lombok.Setter;
18+
import lombok.ToString;
19+
20+
@Builder
21+
@AllArgsConstructor
22+
@NoArgsConstructor
23+
@Setter
24+
@Getter
25+
@Valid
26+
@ToString
27+
public class JobEvent extends BaseEvent {
28+
@NotNull private ZonedDateTime eventTime;
29+
@Valid @NotNull private LineageEvent.Job job;
30+
@Valid private List<LineageEvent.Dataset> inputs;
31+
@Valid private List<LineageEvent.Dataset> outputs;
32+
@Valid @NotNull private String producer;
33+
@Valid @NotNull private URI schemaURL;
34+
}

api/src/main/java/marquez/service/models/LineageEvent.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
@Getter
3939
@Valid
4040
@ToString
41-
public class LineageEvent extends BaseJsonModel {
41+
public class LineageEvent extends BaseEvent {
4242

4343
private String eventType;
4444

@@ -48,6 +48,7 @@ public class LineageEvent extends BaseJsonModel {
4848
@Valid private List<Dataset> inputs;
4949
@Valid private List<Dataset> outputs;
5050
@Valid @NotNull private String producer;
51+
@Valid private URI schemaURL;
5152

5253
@AllArgsConstructor
5354
@NoArgsConstructor

0 commit comments

Comments
 (0)