Skip to content

fix: add generics to Binding type #604

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions src/message/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ export * from "./mqtt";
* @property {@link Deserializer} `toEvent` - converts a Message into a CloudEvent
* @property {@link Detector} `isEvent` - determines if a Message can be converted to a CloudEvent
*/
export interface Binding {
binary: Serializer;
structured: Serializer;
export interface Binding<B extends Message = Message, S extends Message = Message> {
binary: Serializer<B>;
structured: Serializer<S>;
toEvent: Deserializer;
isEvent: Detector;
}
Expand Down Expand Up @@ -65,8 +65,8 @@ export enum Mode {
* CloudEvent into a Message.
* @interface
*/
export interface Serializer {
<T>(event: CloudEventV1<T>): Message;
export interface Serializer<M extends Message> {
<T>(event: CloudEventV1<T>): M;
}

/**
Expand Down
22 changes: 11 additions & 11 deletions src/message/kafka/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ export type {
* Bindings for Kafka transport
* @implements {@linkcode Binding}
*/
const Kafka: Binding = {
const Kafka: Binding<KafkaMessage<unknown>, KafkaMessage<string>> = {
binary: toBinaryKafkaMessage,
structured: toStructuredKafkaMessage,
toEvent: deserializeKafkaMessage,
Expand All @@ -35,9 +35,9 @@ type Key = string | Buffer;
* Extends the base Message type to include
* Kafka-specific fields
*/
interface KafkaMessage<T = string> extends Message {
interface KafkaMessage<T = string | Buffer | unknown> extends Message {
key: Key
value: T | string | Buffer | unknown
value: T
timestamp?: string
}

Expand All @@ -61,7 +61,7 @@ interface KafkaEvent<T> extends CloudEventV1<T> {
* @param {KafkaEvent<T>} event The event to serialize
* @returns {KafkaMessage<T>} a KafkaMessage instance
*/
function toBinaryKafkaMessage<T>(event: CloudEventV1<T>): KafkaMessage<T> {
function toBinaryKafkaMessage<T>(event: CloudEventV1<T>): KafkaMessage<T | undefined> {
// 3.2.1. Content Type
// For the binary mode, the header content-type property MUST be mapped directly
// to the CloudEvents datacontenttype attribute.
Expand All @@ -86,7 +86,7 @@ function toBinaryKafkaMessage<T>(event: CloudEventV1<T>): KafkaMessage<T> {
* @param {CloudEvent<T>} event the CloudEvent to be serialized
* @returns {KafkaMessage<T>} a KafkaMessage instance
*/
function toStructuredKafkaMessage<T>(event: CloudEventV1<T>): KafkaMessage<T> {
function toStructuredKafkaMessage<T>(event: CloudEventV1<T>): KafkaMessage<string> {
if ((event instanceof CloudEvent) && event.data_base64) {
// The event's data is binary - delete it
event = event.cloneWith({ data: undefined });
Expand Down Expand Up @@ -130,9 +130,9 @@ function deserializeKafkaMessage<T>(message: Message): CloudEvent<T> | CloudEven
case Mode.BINARY:
return parseBinary(m);
case Mode.STRUCTURED:
return parseStructured(m);
return parseStructured(m as unknown as KafkaMessage<string>);
case Mode.BATCH:
return parseBatched(m);
return parseBatched(m as unknown as KafkaMessage<string>);
default:
throw new ValidationError("Unknown Message mode");
}
Expand Down Expand Up @@ -212,14 +212,14 @@ function parseBinary<T>(message: KafkaMessage<T>): CloudEvent<T> {
* @param {KafkaMessage<T>} message the message
* @returns {CloudEvent<T>} a KafkaEvent<T>
*/
function parseStructured<T>(message: KafkaMessage<T>): CloudEvent<T> {
function parseStructured<T>(message: KafkaMessage<string>): CloudEvent<T> {
// Although the format of a structured encoded event could be something
// other than JSON, e.g. XML, we currently only support JSON
// encoded structured events.
if (!message.headers[CONSTANTS.HEADER_CONTENT_TYPE]?.startsWith(CONSTANTS.MIME_CE_JSON)) {
throw new ValidationError(`Unsupported event encoding ${message.headers[CONSTANTS.HEADER_CONTENT_TYPE]}`);
}
const eventObj = JSON.parse(message.value as string);
const eventObj = JSON.parse(message.value);
eventObj.time = new Date(eventObj.time).toISOString();
return new CloudEvent({
...eventObj,
Expand All @@ -232,14 +232,14 @@ function parseStructured<T>(message: KafkaMessage<T>): CloudEvent<T> {
* @param {KafkaMessage<T>} message the message
* @returns {CloudEvent<T>[]} an array of KafkaEvent<T>
*/
function parseBatched<T>(message: KafkaMessage<T>): CloudEvent<T>[] {
function parseBatched<T>(message: KafkaMessage<string>): CloudEvent<T>[] {
// Although the format of batch encoded events could be something
// other than JSON, e.g. XML, we currently only support JSON
// encoded structured events.
if (!message.headers[CONSTANTS.HEADER_CONTENT_TYPE]?.startsWith(CONSTANTS.MIME_CE_BATCH)) {
throw new ValidationError(`Unsupported event encoding ${message.headers[CONSTANTS.HEADER_CONTENT_TYPE]}`);
}
const events = JSON.parse(message.value as string) as Record<string, unknown>[];
const events = JSON.parse(message.value) as Record<string, unknown>[];
return events.map((e) => new CloudEvent({ ...e, partitionkey: message.key }, false));
}

Expand Down
4 changes: 2 additions & 2 deletions src/message/mqtt/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ export type { MQTTMessage };
* Extends the base {@linkcode Message} interface to include MQTT attributes, some of which
* are aliases of the {Message} attributes.
*/
interface MQTTMessage<T> extends Message<T> {
interface MQTTMessage<T = unknown> extends Message<T> {
/**
* Identifies this message as a PUBLISH packet. MQTTMessages created with
* the `binary` and `structured` Serializers will contain a "Content Type"
Expand All @@ -37,7 +37,7 @@ interface MQTTMessage<T> extends Message<T> {
* Binding for MQTT transport support
* @implements @linkcode Binding
*/
const MQTT: Binding = {
const MQTT: Binding<MQTTMessage, MQTTMessage> = {
binary,
structured,
toEvent: toEvent as Deserializer,
Expand Down