Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
setup frontend for ssehttpquery
  • Loading branch information
iamfaran authored and raheeliftikhar5 committed Jul 22, 2025
commit 7b6858163a66848c7c837c580131ca1aa873da6f
271 changes: 29 additions & 242 deletions client/packages/lowcoder/src/comps/queries/httpQuery/sseHttpQuery.tsx
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
// SSEHTTPQUERY.tsx
import { Dropdown, ValueFromOption } from "components/Dropdown";
import { QueryConfigItemWrapper, QueryConfigLabel, QueryConfigWrapper } from "components/query";
import { valueComp, withDefault } from "comps/generators";
import { trans } from "i18n";
import { includes } from "lodash";
import { CompAction, MultiBaseComp } from "lowcoder-core";
import { keyValueListControl } from "../../controls/keyValueListControl";
import { ParamsJsonControl, ParamsStringControl, ParamsControlType } from "../../controls/paramsControl";
import { ParamsJsonControl, ParamsStringControl } from "../../controls/paramsControl";
import { withTypeAndChildrenAbstract } from "../../generators/withType";
import { QueryResult } from "../queryComp";
import { QUERY_EXECUTION_ERROR, QUERY_EXECUTION_OK } from "constants/queryConstants";
import { JSONValue } from "util/jsonTypes";
import { FunctionProperty } from "../queryCompUtils";
import { toSseQueryView } from "../queryCompUtils";
import {
HttpHeaderPropertyView,
HttpParametersPropertyView,
Expand Down Expand Up @@ -52,7 +50,9 @@ const CommandMap = {
const childrenMap = {
httpMethod: valueComp<HttpMethodValue>("GET"),
path: ParamsStringControl,
headers: withDefault(keyValueListControl(), [{ key: "", value: "" }]),
headers: withDefault(keyValueListControl(), [
{ key: "Accept", value: "text/event-stream" }
]),
params: withDefault(keyValueListControl(), [{ key: "", value: "" }]),
bodyFormData: withDefault(
keyValueListControl(true, [
Expand All @@ -61,6 +61,8 @@ const childrenMap = {
] as const),
[{ key: "", value: "", type: "text" }]
),
// Add SSE-specific configuration
streamingEnabled: valueComp<boolean>(true),
};

const SseHttpTmpQuery = withTypeAndChildrenAbstract(
Expand All @@ -72,9 +74,6 @@ const SseHttpTmpQuery = withTypeAndChildrenAbstract(
);

export class SseHttpQuery extends SseHttpTmpQuery {
private eventSource: EventSource | undefined;
private controller: AbortController | undefined;

isWrite(action: CompAction) {
return (
action.path.includes("httpMethod") && "value" in action && !includes(["GET"], action.value)
Expand All @@ -89,241 +88,13 @@ export class SseHttpQuery extends SseHttpTmpQuery {
...children.bodyFormData.getQueryParams(),
...children.path.getQueryParams(),
...children.body.getQueryParams(),
// Add streaming flag to params
{ key: "_streaming", value: () => "true" },
{ key: "_streamingEnabled", value: () => children.streamingEnabled.getView() }
];

return this.createStreamingQueryView(params);
}

private createStreamingQueryView(params: FunctionProperty[]) {
return async (props: {
queryId: string;
applicationId: string;
applicationPath: string[];
args?: Record<string, unknown>;
variables?: any;
timeout: InstanceType<ParamsControlType>;
callback?: (result: QueryResult) => void;
}): Promise<QueryResult> => {

try {
const timer = performance.now();

// Process parameters like toQueryView does
const processedParams = this.processParameters(params, props);

// Build request from processed parameters
const { url, headers, method, body } = this.buildRequestFromParams(processedParams, props.args);

// Execute streaming logic
if (method === "GET") {
return this.handleEventSource(url, headers, props, timer);
} else {
return this.handleStreamingFetch(url, headers, method, body, props, timer);
}

} catch (error) {
return this.createErrorResponse((error as Error).message);
}
};
}

private processParameters(params: FunctionProperty[], props: any) {
let mappedVariables: Array<{key: string, value: string}> = [];
Object.keys(props.variables || {})
.filter(k => k !== "$queryName")
.forEach(key => {
const value = Object.hasOwn(props.variables[key], 'value') ? props.variables[key].value : props.variables[key];
mappedVariables.push({
key: `${key}.value`,
value: value || ""
});
});

return [
...params.filter(param => {
return !mappedVariables.map(v => v.key).includes(param.key);
}).map(({ key, value }) => ({ key, value: value(props.args) })),
...Object.entries(props.timeout.getView()).map(([key, value]) => ({
key,
value: (value as any)(props.args),
})),
...mappedVariables,
];
}

private buildRequestFromParams(processedParams: Array<{key: string, value: any}>, args: Record<string, unknown> = {}) {
// Hardcoded values from the screenshot for testing
const url = "http://localhost:11434/api/generate";
const headers = {
"Content-Type": "application/json",
"Accept": "text/event-stream"
};
const method = "POST";
const body = JSON.stringify({
"model": "gemma3",
"prompt": "Tell me a short story about a robot",
"stream": true
});

console.log("Hardcoded request:", { url, headers, method, body });

return { url, headers, method, body };
}

private async handleEventSource(
url: string,
headers: Record<string, string>,
props: any,
timer: number
): Promise<QueryResult> {
return new Promise((resolve, reject) => {
// Clean up any existing connection
this.cleanup();

this.eventSource = new EventSource(url);

this.eventSource.onopen = () => {
resolve(this.createSuccessResponse("SSE connection established", timer));
};

this.eventSource.onmessage = (event) => {
try {
const data = JSON.parse(event.data);
props.callback?.(this.createSuccessResponse(data));
} catch (error) {
// Handle non-JSON data
props.callback?.(this.createSuccessResponse(event.data));
}
};

this.eventSource.onerror = (error) => {
this.cleanup();
reject(this.createErrorResponse("SSE connection error"));
};
});
}

private async handleStreamingFetch(
url: string,
headers: Record<string, string>,
method: string,
body: string | FormData | undefined,
props: any,
timer: number
): Promise<QueryResult> {
// Clean up any existing connection
this.cleanup();

this.controller = new AbortController();

const response = await fetch(url, {
method,
headers: {
...headers,
'Accept': 'text/event-stream',
},
body,
signal: this.controller.signal,
});

if (!response.ok) {
throw new Error(`HTTP ${response.status}: ${response.statusText}`);
}

// Handle streaming response
const reader = response.body?.getReader();
const decoder = new TextDecoder();

if (!reader) {
throw new Error("No readable stream available");
}

// Process stream in background
this.processStream(reader, decoder, props.callback);

return this.createSuccessResponse("Stream connection established", timer);
}

private async processStream(
reader: ReadableStreamDefaultReader<Uint8Array>,
decoder: TextDecoder,
callback?: (result: QueryResult) => void
) {
let buffer = '';

try {
while (true) {
const { done, value } = await reader.read();

if (done) break;

buffer += decoder.decode(value, { stream: true });

// Process complete JSON objects or SSE events
const lines = buffer.split('\n');
buffer = lines.pop() || '';

for (const line of lines) {
if (line.trim()) {
try {
// Handle SSE format: data: {...}
let jsonData = line.trim();
if (jsonData.startsWith('data: ')) {
jsonData = jsonData.substring(6);
}

// Skip SSE control messages
if (jsonData === '[DONE]' || jsonData.startsWith('event:') || jsonData.startsWith('id:')) {
continue;
}

const data = JSON.parse(jsonData);
callback?.(this.createSuccessResponse(data));
} catch (error) {
// Handle non-JSON lines or plain text
if (line.trim() !== '') {
callback?.(this.createSuccessResponse(line.trim()));
}
}
}
}
}
} catch (error: any) {
if (error.name !== 'AbortError') {
callback?.(this.createErrorResponse((error as Error).message));
}
} finally {
reader.releaseLock();
}
}

private createSuccessResponse(data: JSONValue, runTime?: number): QueryResult {
return {
data,
runTime: runTime || 0,
success: true,
code: QUERY_EXECUTION_OK,
};
}

private createErrorResponse(message: string): QueryResult {
return {
message,
data: "",
success: false,
code: QUERY_EXECUTION_ERROR,
};
}

public cleanup() {
if (this.eventSource) {
this.eventSource.close();
this.eventSource = undefined;
}
if (this.controller) {
this.controller.abort();
this.controller = undefined;
}
// Use SSE-specific query view
return toSseQueryView(params);
}

propertyView(props: {
Expand Down Expand Up @@ -410,6 +181,13 @@ const SseHttpQueryPropertyView = (props: {
let headers = children.headers
.toJsonValue()
.filter((header) => header.key !== ContentTypeKey);

// Always ensure Accept: text/event-stream for SSE
const hasAcceptHeader = headers.some(h => h.key === "Accept");
if (!hasAcceptHeader) {
headers.push({ key: "Accept", value: "text/event-stream" });
}

if (value !== "none") {
headers = [
{
Expand All @@ -430,6 +208,15 @@ const SseHttpQueryPropertyView = (props: {
<QueryConfigLabel />
<QueryConfigItemWrapper>{showBodyConfig(children)}</QueryConfigItemWrapper>
</QueryConfigWrapper>

<QueryConfigWrapper>
<QueryConfigLabel>Streaming Options</QueryConfigLabel>
<QueryConfigItemWrapper>
<div style={{ fontSize: "13px", color: "#8B8FA3" }}>
This query will establish a Server-Sent Events connection for real-time data streaming.
</div>
</QueryConfigItemWrapper>
</QueryConfigWrapper>
</>
);
};
Loading