From dc866813f8854f9eafafc8ce1ff93a5e02726ab8 Mon Sep 17 00:00:00 2001 From: nishant Date: Sun, 28 Mar 2021 16:05:16 +0530 Subject: [PATCH 1/5] Add client id to prices request in fixtures. --- .../tests/fixtures/prices-project/protos/subdir/messages.proto | 3 ++- core/tests/init.test.js | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/core/tests/fixtures/prices-project/protos/subdir/messages.proto b/core/tests/fixtures/prices-project/protos/subdir/messages.proto index 967826d..5a6f1ae 100644 --- a/core/tests/fixtures/prices-project/protos/subdir/messages.proto +++ b/core/tests/fixtures/prices-project/protos/subdir/messages.proto @@ -5,9 +5,10 @@ package prices.streaming; message PriceRequest { string uic = 1; string assetType = 2; + string requestId = 3; } // The response message message PriceResponse { string quote = 1; -} \ No newline at end of file +} diff --git a/core/tests/init.test.js b/core/tests/init.test.js index 959af00..ef7301e 100644 --- a/core/tests/init.test.js +++ b/core/tests/init.test.js @@ -182,7 +182,8 @@ describe('init project', () => { const expectedContent = { 'request@': { assetType: '@any', - uic: '@any' + uic: '@any', + requestId: '@any', }, 'response@': { 'stream@': [{quote: 'string'}], From 09788769b97cbaaf71a7dc4743320565a8c44928 Mon Sep 17 00:00:00 2001 From: nishant Date: Sun, 28 Mar 2021 16:25:57 +0530 Subject: [PATCH 2/5] Add data file in fixtures for streaming context. --- .../fixtures/prices-project/config/mappings.yaml | 6 +++++- .../config/streaming/stop-stream-by-context.yml | 15 +++++++++++++++ .../config/streaming/stream-with-context.yml | 12 ++++++++++++ 3 files changed, 32 insertions(+), 1 deletion(-) create mode 100644 core/tests/fixtures/prices-project/config/streaming/stop-stream-by-context.yml diff --git a/core/tests/fixtures/prices-project/config/mappings.yaml b/core/tests/fixtures/prices-project/config/mappings.yaml index 11dfd91..5965cd2 100644 --- a/core/tests/fixtures/prices-project/config/mappings.yaml +++ b/core/tests/fixtures/prices-project/config/mappings.yaml @@ -15,7 +15,11 @@ prices.streaming.Pricing.TwoWaySubscribe : [ "prices/stock-streams/211-stock.yaml", "prices/stock-streams/212-stock.yaml", "prices/stock-streams/299-stock-template.yaml", - "prices/stock-streams/300-stock-template-repeat.yaml" + "prices/stock-streams/300-stock-template-repeat.yaml", + ], + [ + "streaming/stream-with-context.yml", + "streaming/stop-stream-by-context.yml", ], "prices/two-way-stream.js.yaml", ] diff --git a/core/tests/fixtures/prices-project/config/streaming/stop-stream-by-context.yml b/core/tests/fixtures/prices-project/config/streaming/stop-stream-by-context.yml new file mode 100644 index 0000000..b4581c5 --- /dev/null +++ b/core/tests/fixtures/prices-project/config/streaming/stop-stream-by-context.yml @@ -0,0 +1,15 @@ +request@ : { + uic: "599", + assetType: "any@", + requestId: "any@", +} + +response@ : { + "stream@" : [{ + "quote": "{{request.body.requestId}} won't be streamed anymore", + }], + "stopStreaming@": { + "context@": "my-endpoint-{{request.body.requestId}}", + }, + "doNotRepeat@": true, +} diff --git a/core/tests/fixtures/prices-project/config/streaming/stream-with-context.yml b/core/tests/fixtures/prices-project/config/streaming/stream-with-context.yml index e69de29..ad8468a 100644 --- a/core/tests/fixtures/prices-project/config/streaming/stream-with-context.yml +++ b/core/tests/fixtures/prices-project/config/streaming/stream-with-context.yml @@ -0,0 +1,12 @@ +request@ : { + uic: "501", + assetType: "any@", + requestId: "any@", +} + +response@ : { + "stream@" : [{"quote": "quote"}], + "doNotRepeat@": false, + "context@": "my-endpoint-{{request.body.requestId}}", + "streamInterval@": 10, +} From 3bd5bd0af980c32a165bd7bb8e86f03c0862b297 Mon Sep 17 00:00:00 2001 From: nishant Date: Sun, 28 Mar 2021 16:58:21 +0530 Subject: [PATCH 3/5] Add more console logs. --- core/src/controllers/duplex-streaming-controller.js | 4 +++- core/src/utils/files.js | 7 +++++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/core/src/controllers/duplex-streaming-controller.js b/core/src/controllers/duplex-streaming-controller.js index cc819ee..09187e0 100644 --- a/core/src/controllers/duplex-streaming-controller.js +++ b/core/src/controllers/duplex-streaming-controller.js @@ -28,7 +28,9 @@ module.exports = { for(let i =0; i < responses.length || keepStreaming; i++){ await new Promise((resolve => setTimeout(resolve, streamingDelay))); - callContext.write(responses[i%responses.length]); + const nextMessage = responses[i%responses.length]; + console.log(`Sending response for ${endpoint.getId()} from ${responseFile} `, nextMessage); + callContext.write(nextMessage); } }; diff --git a/core/src/utils/files.js b/core/src/utils/files.js index 9c17c10..19e7515 100644 --- a/core/src/utils/files.js +++ b/core/src/utils/files.js @@ -33,7 +33,10 @@ const readYamlFile = (filePath) => { }; const readYamlFileInDir = (dir, filePath) => { - return readTextFile(path.join(dir, filePath)).then(data => yaml.parse(data)); + const yamlPath = path.join(dir, filePath); + return readTextFile(yamlPath).then(data => yaml.parse(data)).catch(e => { + throw `Failed to read file at ${yamlPath}: ${e.message}` + }); }; const writeYaml = (dir, file, content) => { @@ -98,4 +101,4 @@ module.exports = { createTempDir, writeYaml, copyFile -}; \ No newline at end of file +}; From d57cc37cadae1f86a6ad607068971952e42b5036 Mon Sep 17 00:00:00 2001 From: nishant Date: Sun, 28 Mar 2021 16:58:39 +0530 Subject: [PATCH 4/5] Should close streaming connections. --- core/tests/app/streamingContext.test.js | 51 +++++++++++++++++++ .../streaming/stop-stream-by-context.yml | 4 +- core/tests/helpers/index.js | 38 ++++++++++++++ 3 files changed, 91 insertions(+), 2 deletions(-) create mode 100644 core/tests/app/streamingContext.test.js diff --git a/core/tests/app/streamingContext.test.js b/core/tests/app/streamingContext.test.js new file mode 100644 index 0000000..3680671 --- /dev/null +++ b/core/tests/app/streamingContext.test.js @@ -0,0 +1,51 @@ +const appTestHelper = require('./appTestHelper'); +const fixtures = require('../fixtures'); + +describe('app.js', () => { + let app; + + beforeAll(async () => { + app = await appTestHelper.launchApp({ + port: 50056, + protosPath: fixtures.pricesProject.protosPath, + configPath: fixtures.pricesProject.configPath, + }); + }); + + afterAll(async () => { + await app.closeApp(); + }); + + describe('Streaming context', () => { + const timeout = 60000; + + test('should stop messages by streaming context', async () => { + const requestId = "#request:9231"; + const expectedPriceMessage = {quote: "quote"}; + const expectedStopMessage = {quote: `${requestId} won't be streamed anymore`}; + + // Create a streaming message + const clientStream = await app.client.openPriceStream(); + await clientStream.sendMessage({uic: 501, assetType: 'Stock', requestId}); + + // // wait for 1 sec and close streaming + await new Promise(resolve => setTimeout(resolve, 1000)); + await clientStream.sendMessage({uic: 599, assetType: 'Stock', requestId}); + + // Wait for another 1 sec to observer if any more messages were streamed + await new Promise(resolve => setTimeout(resolve, 1000)); + clientStream.stop(); + const streamedMessages = await clientStream.getNext(); + + // No messages must stream once the subscription is turned off + // No message must follow stop message + + const lastMessage = streamedMessages.pop(); + const secondLastMessage = streamedMessages.pop(); + + expect(secondLastMessage).toEqual(expectedPriceMessage); + expect(lastMessage).toEqual(expectedStopMessage); + }); + + }); +}); diff --git a/core/tests/fixtures/prices-project/config/streaming/stop-stream-by-context.yml b/core/tests/fixtures/prices-project/config/streaming/stop-stream-by-context.yml index b4581c5..ced7de4 100644 --- a/core/tests/fixtures/prices-project/config/streaming/stop-stream-by-context.yml +++ b/core/tests/fixtures/prices-project/config/streaming/stop-stream-by-context.yml @@ -8,8 +8,8 @@ response@ : { "stream@" : [{ "quote": "{{request.body.requestId}} won't be streamed anymore", }], - "stopStreaming@": { - "context@": "my-endpoint-{{request.body.requestId}}", + "actions@": { + "stopStreaming@": {"context@": "my-endpoint-{{request.body.requestId}}"} }, "doNotRepeat@": true, } diff --git a/core/tests/helpers/index.js b/core/tests/helpers/index.js index 9c78a11..c3e76ac 100644 --- a/core/tests/helpers/index.js +++ b/core/tests/helpers/index.js @@ -83,6 +83,44 @@ module.exports = { setTimeout(() => call.end(), timeout); }), + openPriceStream : (timeout = 1000) => new Promise(async (resolve, reject) => { + const protoDefinition = grpc.loadPackageDefinition(pricesProto).prices.streaming; + const client = new protoDefinition.Pricing(url, grpc.credentials.createInsecure()); + + const data = []; + const call = client.TwoWaySubscribe(); + + + call.on('data', function(response) { + data.push(response); + }); + + call.on('end', function() { + resolve(data); + }); + + call.on('error', function(error) { + reject({data, error}); + }); + // setTimeout(() => call.end(), timeout); + resolve({ + stop: async () => { + await call.end(); + }, + sendMessage : (request) => { + call.write(request); + }, + getNext : () => { + return new Promise(resolve => { + // return updates received so far and clear saved data + const messages = data.splice(0); + resolve(messages); + }) + } + } + ) + }), + getClientStreamResponses : (requests, timeout = 1000) => new Promise(async (resolve, reject) => { const protoDefinition = grpc.loadPackageDefinition(pricesProto).prices.streaming; const client = new protoDefinition.Pricing(url, grpc.credentials.createInsecure()); From 33bc6d212519c9a1ffc16d1f32fe8ccb40eea287 Mon Sep 17 00:00:00 2001 From: nishant Date: Sun, 28 Mar 2021 21:25:51 +0530 Subject: [PATCH 5/5] Add support to stop streaing messages. --- core/src/app.js | 21 +++++++++++++- .../duplex-streaming-controller.js | 28 +++++++++++++++++-- core/src/controllers/index.js | 8 +++--- core/src/mappingTemplate.js | 2 ++ 4 files changed, 52 insertions(+), 7 deletions(-) diff --git a/core/src/app.js b/core/src/app.js index 9feed53..0c01e50 100644 --- a/core/src/app.js +++ b/core/src/app.js @@ -19,8 +19,27 @@ module.exports = { const dataFiles = TemplateReader.create({configPath}); const mappingResolver = await endpointDataFileResolver.createResolvers({endpoints, mappings, templates: dataFiles}); + const globalState = { + streamingContexts: {} + }; + + const globalActions = { + "stopStreaming@": async (arg) => { + const contextId = arg["context@"]; + const streamingContext = globalState.streamingContexts[contextId] + if(!contextId){ + throw new Error("stopStreaming@: Please provide context@ to stop.") + } + if(!streamingContext){ + throw new Error(`stopStreaming@: No streaming context found for context@=${contextId}`) + } + delete globalState.streamingContexts[contextId] + await streamingContext.stop(); + } + } + const controllers = await Promise.all(endpoints.map((endpoint) => { - return Controllers.create(endpoint, mappingResolver, dataFiles) + return Controllers.create(endpoint, mappingResolver, dataFiles, globalState, globalActions) })); const client = recording ? await Client.create({host : remoteHost, port : remotePort, endpoints}) : null; diff --git a/core/src/controllers/duplex-streaming-controller.js b/core/src/controllers/duplex-streaming-controller.js index 09187e0..29781a6 100644 --- a/core/src/controllers/duplex-streaming-controller.js +++ b/core/src/controllers/duplex-streaming-controller.js @@ -1,7 +1,7 @@ const DEFAULT_STREAMING_DELAY = 1000; module.exports = { - create: async ({endpoint, mappingResolver, dataFiles}) => { + create: async ({endpoint, mappingResolver, dataFiles, globalState, globalActions}) => { return { endpointId: endpoint.getId(), canHandle: (endpointId) => { @@ -18,6 +18,29 @@ module.exports = { const template = await dataFiles.get(responseFile); const response = template.getResponse().compile({request : {body: request}}); + // ** Start of of streaming context handling **************************************** + const stop = async () => { + keepStreaming = false; + }; + + // Set streaming context + if(response.hasOwnProperty("context")){ + globalState.streamingContexts[response.context] = {stop}; + } + + // Check for actions + if(response.actions){ + for(const action in response.actions){ + const globalAction = globalActions[action]; + if(!globalAction){ + throw new Error(`Undefined actions : ${action}`) + } + await globalAction(response.actions[action]); + console.log("Stopped streaming for messages") + } + } + // ** End of streaming context handling **************************************** + if(!response){ return callContext.end(); } @@ -27,11 +50,12 @@ module.exports = { let keepStreaming = !response.doNotRepeat ; for(let i =0; i < responses.length || keepStreaming; i++){ - await new Promise((resolve => setTimeout(resolve, streamingDelay))); const nextMessage = responses[i%responses.length]; console.log(`Sending response for ${endpoint.getId()} from ${responseFile} `, nextMessage); callContext.write(nextMessage); + await new Promise((resolve => setTimeout(resolve, streamingDelay))); } + console.log("End of streaming for ", request) }; callContext.on('data', (response) => { diff --git a/core/src/controllers/index.js b/core/src/controllers/index.js index 866b49c..7747f76 100644 --- a/core/src/controllers/index.js +++ b/core/src/controllers/index.js @@ -4,7 +4,7 @@ const ClientStreamController = require('./duplex-streaming-controller'); const DuplexStreamController = require('./duplex-streaming-controller'); module.exports = { - create: (endpoint, mappingResolver, dataFiles) => { + create: (endpoint, mappingResolver, dataFiles, globalState, globalActions) => { const clientStream = endpoint.isStreamingRequest(); const serverStream = endpoint.isStreamingResponse(); @@ -17,13 +17,13 @@ module.exports = { } if(isServerStream){ - return ServerStreamController.create({endpoint, mappingResolver, dataFiles}); + return ServerStreamController.create({endpoint, mappingResolver, dataFiles, globalState, globalActions}); } if(isClientStream){ - return ClientStreamController.create({endpoint, mappingResolver, dataFiles}); + return ClientStreamController.create({endpoint, mappingResolver, dataFiles, globalState, globalActions}); } - return DuplexStreamController.create({endpoint, mappingResolver, dataFiles}); + return DuplexStreamController.create({endpoint, mappingResolver, dataFiles, globalState, globalActions}); } } diff --git a/core/src/mappingTemplate.js b/core/src/mappingTemplate.js index 447e507..ae0d46e 100644 --- a/core/src/mappingTemplate.js +++ b/core/src/mappingTemplate.js @@ -14,6 +14,8 @@ module.exports = { if(response['stream@']){ return { stream: compiled(response['stream@'], variables), + context: compiled(response['context@'], variables), + actions: compiled(response['actions@'], variables), doNotRepeat: !!response['doNotRepeat@'], streamInterval: response['streamInterval@'], }