Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 28 additions & 15 deletions src/event/cloudevent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import { v4 as uuidv4 } from "uuid";
import { Emitter } from "..";

import { CloudEventV1, CloudEventV1Attributes, CloudEventV1OptionalAttributes } from "./interfaces";
import { CloudEventV1 } from "./interfaces";
import { validateCloudEvent } from "./spec";
import { ValidationError, isBinary, asBase64, isValidType } from "./validation";

Expand All @@ -23,7 +23,7 @@ export const enum Version {
* interoperability across services, platforms and systems.
* @see https://github.com/cloudevents/spec/blob/v1.0/spec.md
*/
export class CloudEvent implements CloudEventV1 {
export class CloudEvent<T = undefined> implements CloudEventV1<T> {
id: string;
type: string;
source: string;
Expand All @@ -32,7 +32,7 @@ export class CloudEvent implements CloudEventV1 {
dataschema?: string;
subject?: string;
time?: string;
#_data?: Record<string, unknown | string | number | boolean> | string | number | boolean | null | unknown;
#_data?: T;
data_base64?: string;

// Extensions should not exist as it's own object, but instead
Expand All @@ -51,7 +51,7 @@ export class CloudEvent implements CloudEventV1 {
* @param {object} event the event properties
* @param {boolean?} strict whether to perform event validation when creating the object - default: true
*/
constructor(event: CloudEventV1 | CloudEventV1Attributes, strict = true) {
constructor(event: Partial<CloudEventV1<T>>, strict = true) {
// copy the incoming event so that we can delete properties as we go
// everything left after we have deleted know properties becomes an extension
const properties = { ...event };
Expand All @@ -62,10 +62,10 @@ export class CloudEvent implements CloudEventV1 {
this.time = properties.time || new Date().toISOString();
delete properties.time;

this.type = properties.type;
this.type = properties.type as string;
delete (properties as any).type;

this.source = properties.source;
this.source = properties.source as string;
delete (properties as any).source;

this.specversion = (properties.specversion as Version) || Version.V1;
Expand Down Expand Up @@ -126,13 +126,13 @@ See: https://github.com/cloudevents/spec/blob/v1.0/spec.md#type-system`);
Object.freeze(this);
}

get data(): unknown {
get data(): T | undefined {
return this.#_data;
}

set data(value: unknown) {
set data(value: T | undefined) {
if (isBinary(value)) {
this.data_base64 = asBase64(value as Uint32Array);
this.data_base64 = asBase64(value);
}
this.#_data = value;
}
Expand Down Expand Up @@ -184,16 +184,29 @@ See: https://github.com/cloudevents/spec/blob/v1.0/spec.md#type-system`);

/**
* Clone a CloudEvent with new/update attributes
* @param {object} options attributes to augment the CloudEvent with
* @param {object} options attributes to augment the CloudEvent with an `data` property
* @param {boolean} strict whether or not to use strict validation when cloning (default: true)
* @throws if the CloudEvent does not conform to the schema
* @return {CloudEvent} returns a new CloudEvent<T>
*/
public cloneWith(options: Partial<Exclude<CloudEventV1<never>, "data">>, strict?: boolean): CloudEvent<T>;
/**
* Clone a CloudEvent with new/update attributes
* @param {object} options attributes to augment the CloudEvent with a `data` property
* @param {boolean} strict whether or not to use strict validation when cloning (default: true)
* @throws if the CloudEvent does not conform to the schema
* @return {CloudEvent} returns a new CloudEvent<D>
*/
public cloneWith<D>(options: Partial<CloudEvent<D>>, strict?: boolean): CloudEvent<D>;
/**
* Clone a CloudEvent with new/update attributes
* @param {object} options attributes to augment the CloudEvent
* @param {boolean} strict whether or not to use strict validation when cloning (default: true)
* @throws if the CloudEvent does not conform to the schema
* @return {CloudEvent} returns a new CloudEvent
*/
public cloneWith(
options: CloudEventV1 | CloudEventV1Attributes | CloudEventV1OptionalAttributes,
strict = true,
): CloudEvent {
return new CloudEvent(Object.assign({}, this.toJSON(), options) as CloudEvent, strict);
public cloneWith<D>(options: Partial<CloudEventV1<D>>, strict = true): CloudEvent<D | T> {
return new CloudEvent(Object.assign({}, this.toJSON(), options), strict);
}

/**
Expand Down
8 changes: 4 additions & 4 deletions src/event/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
* The object interface for CloudEvents 1.0.
* @see https://github.com/cloudevents/spec/blob/v1.0/spec.md
*/
export interface CloudEventV1 extends CloudEventV1Attributes {
export interface CloudEventV1<T> extends CloudEventV1Attributes<T> {
// REQUIRED Attributes
/**
* [REQUIRED] Identifies the event. Producers MUST ensure that `source` + `id`
Expand All @@ -30,7 +30,7 @@ export interface CloudEventV1 extends CloudEventV1Attributes {
specversion: string;
}

export interface CloudEventV1Attributes extends CloudEventV1OptionalAttributes {
export interface CloudEventV1Attributes<T> extends CloudEventV1OptionalAttributes<T> {
/**
* [REQUIRED] Identifies the context in which an event happened. Often this
* will include information such as the type of the event source, the
Expand Down Expand Up @@ -65,7 +65,7 @@ export interface CloudEventV1Attributes extends CloudEventV1OptionalAttributes {
type: string;
}

export interface CloudEventV1OptionalAttributes {
export interface CloudEventV1OptionalAttributes<T> {
/**
* The following fields are optional.
*/
Expand Down Expand Up @@ -126,7 +126,7 @@ export interface CloudEventV1OptionalAttributes {
* specified by the datacontenttype attribute (e.g. application/json), and adheres
* to the dataschema format when those respective attributes are present.
*/
data?: Record<string, unknown | string | number | boolean> | string | number | boolean | null | unknown;
data?: T;

/**
* [OPTIONAL] The event payload encoded as base64 data. This is used when the
Expand Down
2 changes: 1 addition & 1 deletion src/event/spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ ajv.addFormat("js-date-time", function (dateTimeString) {

const isValidAgainstSchemaV1 = ajv.compile(schemaV1);

export function validateCloudEvent(event: CloudEventV1): boolean {
export function validateCloudEvent<T>(event: CloudEventV1<T>): boolean {
if (event.specversion === Version.V1) {
if (!isValidAgainstSchemaV1(event)) {
throw new ValidationError("invalid payload", isValidAgainstSchemaV1.errors);
Expand Down
6 changes: 3 additions & 3 deletions src/event/validation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ export const isDefined = (v: unknown): boolean => v !== null && typeof v !== "un

export const isBoolean = (v: unknown): boolean => typeof v === "boolean";
export const isInteger = (v: unknown): boolean => Number.isInteger(v as number);
export const isDate = (v: unknown): boolean => v instanceof Date;
export const isBinary = (v: unknown): boolean => v instanceof Uint32Array;
export const isDate = (v: unknown): v is Date => v instanceof Date;
export const isBinary = (v: unknown): v is Uint32Array => v instanceof Uint32Array;

export const isStringOrThrow = (v: unknown, t: Error): boolean =>
isString(v)
Expand Down Expand Up @@ -75,7 +75,7 @@ export const isBuffer = (value: unknown): boolean => value instanceof Buffer;

export const asBuffer = (value: string | Buffer | Uint32Array): Buffer =>
isBinary(value)
? Buffer.from(value as string)
? Buffer.from((value as unknown) as string)
: isBuffer(value)
? (value as Buffer)
: (() => {
Expand Down
2 changes: 1 addition & 1 deletion src/message/http/headers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ export const requiredHeaders = [
* @param {CloudEvent} event a CloudEvent
* @returns {Object} the headers that will be sent for the event
*/
export function headersFor(event: CloudEvent): Headers {
export function headersFor<T>(event: CloudEvent<T>): Headers {
const headers: Headers = {};
let headerMap: Readonly<{ [key: string]: MappedParser }>;
if (event.specversion === Version.V1) {
Expand Down
22 changes: 13 additions & 9 deletions src/message/http/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ import { JSONParser, MappedParser, Parser, parserByContentType } from "../../par
* @param {CloudEvent} event The event to serialize
* @returns {Message} a Message object with headers and body
*/
export function binary(event: CloudEvent): Message {
export function binary<T>(event: CloudEvent<T>): Message {
const contentType: Headers = { [CONSTANTS.HEADER_CONTENT_TYPE]: CONSTANTS.DEFAULT_CONTENT_TYPE };
const headers: Headers = { ...contentType, ...headersFor(event) };
let body = event.data;
if (typeof event.data === "object" && !(event.data instanceof Uint32Array)) {
// we'll stringify objects, but not binary data
body = JSON.stringify(event.data);
body = (JSON.stringify(event.data) as unknown) as T;
}
return {
headers,
Expand All @@ -47,7 +47,7 @@ export function binary(event: CloudEvent): Message {
* @param {CloudEvent} event the CloudEvent to be serialized
* @returns {Message} a Message object with headers and body
*/
export function structured(event: CloudEvent): Message {
export function structured<T>(event: CloudEvent<T>): Message {
if (event.data_base64) {
// The event's data is binary - delete it
event = event.cloneWith({ data: undefined });
Expand Down Expand Up @@ -84,7 +84,7 @@ export function isEvent(message: Message): boolean {
* @param {Message} message the incoming message
* @return {CloudEvent} A new {CloudEvent} instance
*/
export function deserialize(message: Message): CloudEvent {
export function deserialize<T>(message: Message): CloudEvent<T> {
const cleanHeaders: Headers = sanitize(message.headers);
const mode: Mode = getMode(cleanHeaders);
const version = getVersion(mode, cleanHeaders, message.body);
Expand Down Expand Up @@ -133,7 +133,11 @@ function getVersion(mode: Mode, headers: Headers, body: string | Record<string,
}
} else {
// structured mode - the version is in the body
return typeof body === "string" ? JSON.parse(body).specversion : (body as CloudEvent).specversion;
if (typeof body === "string") {
return JSON.parse(body).specversion;
} else {
return (body as Record<string, string>).specversion;
}
}
return Version.V1;
}
Expand All @@ -147,7 +151,7 @@ function getVersion(mode: Mode, headers: Headers, body: string | Record<string,
* @returns {CloudEvent} an instance of CloudEvent representing the incoming request
* @throws {ValidationError} of the event does not conform to the spec
*/
function parseBinary(message: Message, version: Version): CloudEvent {
function parseBinary<T>(message: Message, version: Version): CloudEvent<T> {
const headers = { ...message.headers };
let body = message.body;

Expand Down Expand Up @@ -187,7 +191,7 @@ function parseBinary(message: Message, version: Version): CloudEvent {
delete eventObj.datacontentencoding;
}

return new CloudEvent({ ...eventObj, data: body } as CloudEventV1, false);
return new CloudEvent<T>({ ...eventObj, data: body } as CloudEventV1<T>, false);
}

/**
Expand All @@ -198,7 +202,7 @@ function parseBinary(message: Message, version: Version): CloudEvent {
* @returns {CloudEvent} a new CloudEvent instance for the provided headers and payload
* @throws {ValidationError} if the payload and header combination do not conform to the spec
*/
function parseStructured(message: Message, version: Version): CloudEvent {
function parseStructured<T>(message: Message, version: Version): CloudEvent<T> {
const payload = message.body;
const headers = message.headers;

Expand Down Expand Up @@ -240,5 +244,5 @@ function parseStructured(message: Message, version: Version): CloudEvent {
delete eventObj.data_base64;
delete eventObj.datacontentencoding;
}
return new CloudEvent(eventObj as CloudEventV1, false);
return new CloudEvent<T>(eventObj as CloudEventV1<T>, false);
}
4 changes: 2 additions & 2 deletions src/message/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ export enum Mode {
* @interface
*/
export interface Serializer {
(event: CloudEvent): Message;
<T>(event: CloudEvent<T>): Message;
}

/**
Expand All @@ -70,7 +70,7 @@ export interface Serializer {
* @interface
*/
export interface Deserializer {
(message: Message): CloudEvent;
<T>(message: Message): CloudEvent<T>;
}

/**
Expand Down
6 changes: 3 additions & 3 deletions src/transport/emitter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ export interface Options {
* @interface
*/
export interface EmitterFunction {
(event: CloudEvent, options?: Options): Promise<unknown>;
<T>(event: CloudEvent<T>, options?: Options): Promise<unknown>;
}

/**
Expand Down Expand Up @@ -56,7 +56,7 @@ export function emitterFor(fn: TransportFunction, options = emitterDefaults): Em
throw new TypeError("A TransportFunction is required");
}
const { binding, mode }: any = { ...emitterDefaults, ...options };
return function emit(event: CloudEvent, opts?: Options): Promise<unknown> {
return function emit<T>(event: CloudEvent<T>, opts?: Options): Promise<unknown> {
opts = opts || {};

switch (mode) {
Expand Down Expand Up @@ -109,7 +109,7 @@ export class Emitter {
* @param {boolean} ensureDelivery fail the promise if one listener fails
* @return {void}
*/
static async emitEvent(event: CloudEvent, ensureDelivery = true): Promise<void> {
static async emitEvent<T>(event: CloudEvent<T>, ensureDelivery = true): Promise<void> {
if (!ensureDelivery) {
// Ensure delivery is disabled so we don't wait for Promise
Emitter.getInstance().emit("cloudevent", event);
Expand Down
9 changes: 4 additions & 5 deletions test/integration/cloud_event_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,13 @@ import fs from "fs";

import { expect } from "chai";
import { CloudEvent, ValidationError, Version } from "../../src";
import { CloudEventV1 } from "../../src/event/interfaces";
import { asBase64 } from "../../src/event/validation";

const type = "org.cncf.cloudevents.example";
const source = "http://unit.test";
const id = "b46cf653-d48a-4b90-8dfa-355c01061361";

const fixture: CloudEventV1 = {
const fixture = {
id,
specversion: Version.V1,
source,
Expand All @@ -34,17 +33,17 @@ describe("A CloudEvent", () => {
});

it("Can be constructed with loose validation", () => {
const ce = new CloudEvent({} as CloudEventV1, false);
const ce = new CloudEvent({}, false);
expect(ce).to.be.instanceOf(CloudEvent);
});

it("Loosely validated events can be cloned", () => {
const ce = new CloudEvent({} as CloudEventV1, false);
const ce = new CloudEvent({}, false);
expect(ce.cloneWith({}, false)).to.be.instanceOf(CloudEvent);
});

it("Loosely validated events throw when validated", () => {
const ce = new CloudEvent({} as CloudEventV1, false);
const ce = new CloudEvent({}, false);
expect(ce.validate).to.throw(ValidationError, "invalid payload");
});

Expand Down
8 changes: 4 additions & 4 deletions test/integration/message_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ describe("HTTP transport", () => {
},
};
expect(HTTP.isEvent(message)).to.be.true;
const event: CloudEvent = HTTP.toEvent(message);
const event = HTTP.toEvent(message);
expect(event.LUNCH).to.equal("tacos");
expect(function () {
event.validate();
Expand All @@ -96,7 +96,7 @@ describe("HTTP transport", () => {
},
};
expect(HTTP.isEvent(message)).to.be.true;
const event: CloudEvent = HTTP.toEvent(message);
const event = HTTP.toEvent(message);
expect(event.specversion).to.equal("11.8");
expect(event.validate()).to.be.false;
});
Expand Down Expand Up @@ -167,7 +167,7 @@ describe("HTTP transport", () => {
});

describe("Specification version V1", () => {
const fixture: CloudEvent = new CloudEvent({
const fixture = new CloudEvent({
specversion: Version.V1,
id,
type,
Expand Down Expand Up @@ -270,7 +270,7 @@ describe("HTTP transport", () => {
});

describe("Specification version V03", () => {
const fixture: CloudEvent = new CloudEvent({
const fixture = new CloudEvent({
specversion: Version.V03,
id,
type,
Expand Down
Loading