diff --git a/.processcube/nodered/.flows.json.backup b/.processcube/nodered/.flows.json.backup index d95ae4a..c3bae40 100644 --- a/.processcube/nodered/.flows.json.backup +++ b/.processcube/nodered/.flows.json.backup @@ -826,6 +826,32 @@ "w": 932, "h": 142 }, + { + "id": "e910eea0845632e3", + "type": "group", + "z": "a23d2e782beb66f4", + "style": { + "stroke": "#999999", + "stroke-opacity": "1", + "fill": "none", + "fill-opacity": "1", + "label": true, + "label-position": "nw", + "color": "#a4a4a4" + }, + "nodes": [ + "aa0790a54e613a4f", + "a55984c3ea0a1752", + "c690a8a876c4ac00", + "f05bde567a6fe14c", + "2c4d864dde746bee", + "f3f34fd07e42d94e" + ], + "x": 34, + "y": 1619, + "w": 892, + "h": 162 + }, { "id": "42e6796dddd9d4db", "type": "processcube-engine-config", @@ -1777,9 +1803,10 @@ "wires": [] }, { - "id": "f80c13daa49ee8dd", + "id": "aa0790a54e613a4f", "type": "externaltask-input", "z": "a23d2e782beb66f4", + "g": "e910eea0845632e3", "name": "", "workername": "", "engine": "42e6796dddd9d4db", @@ -1788,43 +1815,96 @@ "workerConfig": "{}", "workerConfigType": "json", "traces": [], - "x": 100, - "y": 1680, + "x": 140, + "y": 1660, "wires": [ [ - "10aaf6e4aa5eacbe" + "a55984c3ea0a1752" ] ] }, { - "id": "10aaf6e4aa5eacbe", + "id": "c690a8a876c4ac00", "type": "function", "z": "a23d2e782beb66f4", + "g": "e910eea0845632e3", "name": "function 4", - "func": "\nreturn msg;", + "func": "return msg;", "outputs": 1, "timeout": 0, "noerr": 0, "initialize": "", "finalize": "", "libs": [], - "x": 360, - "y": 1680, + "x": 620, + "y": 1660, "wires": [ [ - "f943a85ad6ddd7df" + "f05bde567a6fe14c" ] ] }, { - "id": "f943a85ad6ddd7df", + "id": "f05bde567a6fe14c", "type": "externaltask-output", "z": "a23d2e782beb66f4", + "g": "e910eea0845632e3", "name": "", - "x": 580, - "y": 1680, + "x": 810, + "y": 1660, "wires": [] }, + { + "id": "a55984c3ea0a1752", + "type": "function", + "z": "a23d2e782beb66f4", + "g": "e910eea0845632e3", + "name": "function 5", + "func": "\nreturn msg;", + "outputs": 1, + "timeout": 0, + "noerr": 0, + "initialize": "", + "finalize": "", + "libs": [], + "x": 380, + "y": 1660, + "wires": [ + [ + "c690a8a876c4ac00" + ] + ] + }, + { + "id": "2c4d864dde746bee", + "type": "catch", + "z": "a23d2e782beb66f4", + "g": "e910eea0845632e3", + "name": "", + "scope": "group", + "uncaught": false, + "x": 390, + "y": 1740, + "wires": [ + [ + "f3f34fd07e42d94e" + ] + ] + }, + { + "id": "f3f34fd07e42d94e", + "type": "externaltask-error", + "z": "a23d2e782beb66f4", + "g": "e910eea0845632e3", + "name": "", + "error": "", + "message": "", + "x": 640, + "y": 1740, + "wires": [ + [] + ] + }, { "id": "82533b02fb9a9f04", "type": "comment", diff --git a/.processcube/nodered/flows.json b/.processcube/nodered/flows.json index b00330e..021bbdd 100644 --- a/.processcube/nodered/flows.json +++ b/.processcube/nodered/flows.json @@ -826,6 +826,32 @@ "w": 932, "h": 142 }, + { + "id": "e910eea0845632e3", + "type": "group", + "z": "a23d2e782beb66f4", + "style": { + "stroke": "#999999", + "stroke-opacity": "1", + "fill": "none", + "fill-opacity": "1", + "label": true, + "label-position": "nw", + "color": "#a4a4a4" + }, + "nodes": [ + "aa0790a54e613a4f", + "a55984c3ea0a1752", + "c690a8a876c4ac00", + "f05bde567a6fe14c", + "2c4d864dde746bee", + "f3f34fd07e42d94e" + ], + "x": 34, + "y": 1619, + "w": 892, + "h": 162 + }, { "id": "42e6796dddd9d4db", "type": "processcube-engine-config", @@ -1777,9 +1803,10 @@ "wires": [] }, { - "id": "f80c13daa49ee8dd", + "id": "aa0790a54e613a4f", "type": "externaltask-input", "z": "a23d2e782beb66f4", + "g": "e910eea0845632e3", "name": "", "workername": "", "engine": "42e6796dddd9d4db", @@ -1788,43 +1815,96 @@ "workerConfig": "{}", "workerConfigType": "json", "traces": [], - "x": 100, - "y": 1680, + "x": 140, + "y": 1660, "wires": [ [ - "10aaf6e4aa5eacbe" + "a55984c3ea0a1752" ] ] }, { - "id": "10aaf6e4aa5eacbe", + "id": "c690a8a876c4ac00", "type": "function", "z": "a23d2e782beb66f4", + "g": "e910eea0845632e3", "name": "function 4", - "func": "\nreturn msg;", + "func": "//return msg;", "outputs": 1, "timeout": 0, "noerr": 0, "initialize": "", "finalize": "", "libs": [], - "x": 340, - "y": 1680, + "x": 620, + "y": 1660, "wires": [ [ - "f943a85ad6ddd7df" + "f05bde567a6fe14c" ] ] }, { - "id": "f943a85ad6ddd7df", + "id": "f05bde567a6fe14c", "type": "externaltask-output", "z": "a23d2e782beb66f4", + "g": "e910eea0845632e3", "name": "", - "x": 580, - "y": 1680, + "x": 810, + "y": 1660, "wires": [] }, + { + "id": "a55984c3ea0a1752", + "type": "function", + "z": "a23d2e782beb66f4", + "g": "e910eea0845632e3", + "name": "function 5", + "func": "\nreturn msg;", + "outputs": 1, + "timeout": 0, + "noerr": 0, + "initialize": "", + "finalize": "", + "libs": [], + "x": 380, + "y": 1660, + "wires": [ + [ + "c690a8a876c4ac00" + ] + ] + }, + { + "id": "2c4d864dde746bee", + "type": "catch", + "z": "a23d2e782beb66f4", + "g": "e910eea0845632e3", + "name": "", + "scope": "group", + "uncaught": false, + "x": 390, + "y": 1740, + "wires": [ + [ + "f3f34fd07e42d94e" + ] + ] + }, + { + "id": "f3f34fd07e42d94e", + "type": "externaltask-error", + "z": "a23d2e782beb66f4", + "g": "e910eea0845632e3", + "name": "", + "error": "", + "message": "", + "x": 640, + "y": 1740, + "wires": [ + [] + ] + }, { "id": "82533b02fb9a9f04", "type": "comment", diff --git a/externaltask-input.js b/externaltask-input.js index 075cc0d..6cbf939 100644 --- a/externaltask-input.js +++ b/externaltask-input.js @@ -1,9 +1,110 @@ const EventEmitter = require('node:events'); +class ExternalTaskNodeStates { + constructor(flowNodeInstanceId) { + this.flowNodeInstanceId = flowNodeInstanceId; + this.nodeStades = {}; // Track send calls per nodeId + } + + markSended(nodeId) { + if (!this.nodeStades[nodeId]) { + this.nodeStades[nodeId] = { gotSend: false, gotCompleted: false }; + } + + this.nodeStades[nodeId].gotSend = true; + } + + markCompleted(nodeId) { + if (!this.nodeStades[nodeId]) { + this.nodeStades[nodeId] = { gotSend: false, gotCompleted: false }; + } + + this.nodeStades[nodeId].gotCompleted = true; + } + + checkIfCompletedWithoutSend(nodeId) { + const nodeState = this.nodeStades[nodeId]; + const result = (nodeState && nodeState.gotCompleted && !nodeState.gotSend); + + return result; + } +} + module.exports = function (RED) { const os = require('os'); + // Global dictionary for tracking external tasks by flowNodeInstanceId + const globalExternalTaskStates = {}; + + const raiseExternalTaskError = (flowNodeInstanceId, etwInputNodeId, nodeId) => { + const fullNode = RED.nodes.getNode(nodeId); + + const wires = fullNode?.wires; + const hasConnectedOutputs = wires && wires.some(wireArray => wireArray && wireArray.length > 0); + + if (hasConnectedOutputs) { + const inputNode = RED.nodes.getNode(etwInputNodeId); + + if (inputNode && inputNode.eventEmitter) { + const errorMessage = `Node ${nodeId} (${fullNode.name || fullNode.type}) completed without sending output`; + const error = new Error(errorMessage); + error.errorCode = 'NODE_NO_OUTPUT'; + error.errorDetails = RED.util.encodeObject({ + flowNodeInstanceId: flowNodeInstanceId, + nodeId: nodeId, + nodeName: fullNode.name, + nodeType: fullNode.type + }); + + inputNode.eventEmitter.emit(`handle-${flowNodeInstanceId}`, error, true); + } + } + }; + + // Example synchronous onSend hook + RED.hooks.add("onSend", (sendEvents) => { + for (const sendEvent of sendEvents) { + + // Call send method on ExternalTaskState if this message has a flowNodeInstanceId + if (sendEvent.msg?.flowNodeInstanceId) { + let externalTaskNodeStates = globalExternalTaskStates[sendEvent.msg.flowNodeInstanceId]; + + if (!externalTaskNodeStates) { + externalTaskNodeStates = new ExternalTaskNodeStates(sendEvent.msg.flowNodeInstanceId); + globalExternalTaskStates[sendEvent.msg.flowNodeInstanceId] = externalTaskNodeStates; + } + + externalTaskNodeStates.markSended(sendEvent.source.node.id) + + if (externalTaskNodeStates.checkIfCompletedWithoutSend(sendEvent.source.node.id)) { + raiseExternalTaskError(sendEvent.msg.flowNodeInstanceId, sendEvent.msg.etw_input_node_id, sendEvent.source.node.id); + } + } + } + }); + + const onCompleted = (completeEvent) => { + + // Check if this is an external task message + if (completeEvent.msg?.flowNodeInstanceId) { + let externalTaskNodeStates = globalExternalTaskStates[completeEvent.msg.flowNodeInstanceId]; + + if (!externalTaskNodeStates) { + externalTaskNodeStates = new ExternalTaskNodeStates(completeEvent.msg.flowNodeInstanceId); + globalExternalTaskStates[completeEvent.msg.flowNodeInstanceId] = externalTaskNodeStates; + } + + externalTaskNodeStates.markCompleted(completeEvent.node.id); + + if (externalTaskNodeStates.checkIfCompletedWithoutSend(completeEvent.node.id)) { + raiseExternalTaskError(completeEvent.msg.flowNodeInstanceId, completeEvent.msg.etw_input_node_id, completeEvent.node.id); + } + } + } + + RED.hooks.add("onComplete", onCompleted); + function ExternalTaskInput(config) { RED.nodes.createNode(this, config); var node = this; @@ -357,15 +458,30 @@ module.exports = function (RED) { return; } const etwCallback = async (payload, externalTask) => { + + globalExternalTaskStates[externalTask.flowNodeInstanceId] = new ExternalTaskNodeStates(externalTask.flowNodeInstanceId); + const saveHandleCallback = (data, callback, msg) => { try { callback(data); node.log(`send to engine *external task flowNodeInstanceId* '${externalTask.flowNodeInstanceId}', topic '${node.topic}' and *processInstanceId* ${externalTask.processInstanceId}`); + + // Remove ExternalTaskState from global dictionary + if (globalExternalTaskStates[externalTask.flowNodeInstanceId]) { + delete globalExternalTaskStates[externalTask.flowNodeInstanceId]; + } + node.setFinishHandlingTaskStatus(externalTask); } catch (error) { + // Remove ExternalTaskState from global dictionary on error as well + if (globalExternalTaskStates[externalTask.flowNodeInstanceId]) { + delete globalExternalTaskStates[externalTask.flowNodeInstanceId]; + } + node.setErrorFinishHandlingTaskStatus(externalTask, error); msg.error = error; node.error(`failed send to engine *external task flowNodeInstanceId* '${externalTask.flowNodeInstanceId}', topic '${node.topic}' and *processInstanceId* ${externalTask.processInstanceId}: ${error?.message}`, msg); + callback(error); } }; @@ -405,7 +521,7 @@ module.exports = function (RED) { msg.etw_duration = new Date(msg.etw_finished_at) - new Date(msg.etw_started_at); } } catch (error) { - node.error(`failed to calculate duration: ${error?.message}`, msg); + node.error(`failed to calculate duration: ${error?.message}`, msg); } node.log( diff --git a/processes/External-Task-Hang.bpmn b/processes/External-Task-Hang.bpmn new file mode 100644 index 0000000..a83dcb2 --- /dev/null +++ b/processes/External-Task-Hang.bpmn @@ -0,0 +1,58 @@ + + + + + + + + + StartEvent_1 + Event_0i7y8nc + Activity_1yuj5dj + + + + Flow_0sy9qtj + + + + Flow_0con52z + + + + Flow_0sy9qtj + Flow_0con52z + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +