From 490effae55e43ebb629aa01bf2ac1f1851c693d1 Mon Sep 17 00:00:00 2001 From: wenwen Date: Fri, 16 Aug 2019 00:51:14 +0200 Subject: [PATCH 1/2] basic functionality implemented Signed-off-by: wenwen --- src/libArgParse/GrammarElement.hpp | 3 - src/libCli/CMakeLists.txt | 3 +- src/libCli/Call.cpp | 140 +++++++++++++--------- src/libCli/GrammarConstruction.cpp | 182 +++++++++++++++-------------- src/libCli/GrammarConstruction.hpp | 13 +++ 5 files changed, 195 insertions(+), 146 deletions(-) diff --git a/src/libArgParse/GrammarElement.hpp b/src/libArgParse/GrammarElement.hpp index 933f335..2dbd510 100644 --- a/src/libArgParse/GrammarElement.hpp +++ b/src/libArgParse/GrammarElement.hpp @@ -21,9 +21,6 @@ namespace ArgParse { -// a Graph -class GrammarElement; - class GrammarElement { public: diff --git a/src/libCli/CMakeLists.txt b/src/libCli/CMakeLists.txt index 611b3ca..e10b32b 100644 --- a/src/libCli/CMakeLists.txt +++ b/src/libCli/CMakeLists.txt @@ -13,7 +13,7 @@ # limitations under the License. cmake_minimum_required (VERSION 2.8) - +find_package(Threads REQUIRED) set(TARGET_NAME "cli") set(TARGET_SRC ./MessageParsing.cpp @@ -27,6 +27,7 @@ add_library(${TARGET_NAME} ${TARGET_SRC}) target_link_libraries ( ${TARGET_NAME} reflection ArgParse + ${CMAKE_THREAD_LIBS_INIT} ) if(BUILD_CONFIG_USE_BOOST_REGEX) diff --git a/src/libCli/Call.cpp b/src/libCli/Call.cpp index 3c8ea6e..2c45a40 100644 --- a/src/libCli/Call.cpp +++ b/src/libCli/Call.cpp @@ -18,7 +18,10 @@ #include #include #include +#include +#include #include +#include #include #include @@ -151,8 +154,85 @@ std::string getTimeString() return cstr ; } +int sendProtoMessage(ArgParse::ParsedElement * messageParseTree, grpc::testing::CliCall& call, const grpc::protobuf::Descriptor* inputType, bool printParsedMessage){ + // now we have to construct a protobuf from the parsed argument, which corresponds to the inputType + google::protobuf::DynamicMessageFactory dynamicFactory; + // read data from the parse tree into the protobuf message: + std::unique_ptr message = cli::parseMessage(*messageParseTree, dynamicFactory, inputType); + + if(printParsedMessage) + { + // use built-in human readable output format + cli::OutputFormatter imessageFormatter; + std::cout << "Request message:" << std::endl << imessageFormatter.messageToString(*message, inputType, "| ", "| " ) << std::endl; + } + + if(not message) + { + std::cerr << "Error: Error parsing method arguments -> aborting the call :-(" << std::endl; + return -1; + } + + // now we serialize the message: + grpc::string serializedRequest; + bool success = message->SerializeToString(&serializedRequest); + if(not success) + { + std::cerr << "Error: Failed to serialize method arguments" << std::endl; + return -1; + } + + call.Write(serializedRequest); + return 0; + +} +void sendMessages(ParsedElement & parseTree, grpc::testing::CliCall& call, const google::protobuf::MethodDescriptor * method, bool& result){ + std::vector requestMessages; + // search all passed messages: (true flag prevents searching sub-messages) + parseTree.findAllSubTrees("Message", requestMessages, true); + + const grpc::protobuf::Descriptor* inputType = method->input_type(); + + if(not method->client_streaming() and requestMessages.size() == 0) + { + // User did not give any message arguments for non-streaming RPC + // In this case we just add the parseTree, which causes a default message to be cunstructed: + requestMessages.push_back(&parseTree); + } + + bool secondRun = false; + bool printParsedMessage = parseTree.findFirstChild("PrintParsedMessage") != ""; + for(ArgParse::ParsedElement * messageParseTree : requestMessages) + { + result = sendProtoMessage(messageParseTree, call, inputType, printParsedMessage); + } + bool keepAlive = parseTree.findFirstChild("KeepStreamAlive") != ""; + if(keepAlive){ + Grammar newGrammarPool; + MessageGrammarFactory messageFactory(newGrammarPool); + auto newGrammar = messageFactory.getMessageGrammar("Message", method->input_type()); + while(true) + { + std::string newMessage; + std::cin >> newMessage; + std::cout << "get Message " << newMessage << std::endl; + ParsedElement newTree; + ParseRc rc = newGrammar->parse(newMessage.c_str(), newTree); + if(rc.isGood()){ + result = sendProtoMessage(&newTree, call, inputType, printParsedMessage); + } + } + + } + + // End the request stream. (This is a limitation of gWhisper streaming support, as we sequentially stream all request messages, then end the stream and then handle the reply stream.) No async streaming is possible via this CLI at the moment. + call.WritesDone(); + +} + int call(ParsedElement & parseTree) { + bool result = 0; std::string serverAddress = parseTree.findFirstChild("ServerAddress"); std::string serverPort = parseTree.findFirstChild("ServerPort"); std::string serviceName = parseTree.findFirstChild("Service"); @@ -188,22 +268,6 @@ int call(ParsedElement & parseTree) return -1; } - const grpc::protobuf::Descriptor* inputType = method->input_type(); - - // now we have to construct a protobuf from the parsed argument, which corresponds to the inputType - google::protobuf::DynamicMessageFactory dynamicFactory; - - std::vector requestMessages; - // search all passed messages: (true flag prevents searching sub-messages) - parseTree.findAllSubTrees("Message", requestMessages, true); - - if(not method->client_streaming() and requestMessages.size() == 0) - { - // User did not give any message arguments for non-streaming RPC - // In this case we just add the parseTree, which causes a default message to be cunstructed: - requestMessages.push_back(&parseTree); - } - // Prepare the RPC call: std::multimap clientMetadata; grpc::string serializedResponse; @@ -214,42 +278,13 @@ int call(ParsedElement & parseTree) grpc::testing::CliCall call(channel, methodStr, clientMetadata); // Write all request messages (multiple in case of request stream) - for(ArgParse::ParsedElement * messageParseTree : requestMessages) - { - // read data from the parse tree into the protobuf message: - std::unique_ptr message = cli::parseMessage(*messageParseTree, dynamicFactory, inputType); - - if(parseTree.findFirstChild("PrintParsedMessage") != "") - { - // use built-in human readable output format - cli::OutputFormatter imessageFormatter; - std::cout << "Request message:" << std::endl << imessageFormatter.messageToString(*message, method->input_type(), "| ", "| " ) << std::endl; - } + std::thread requestThread(sendMessages, std::ref(parseTree), std::ref(call), std::ref(method), std::ref(result)); - if(not message) - { - std::cerr << "Error: Error parsing method arguments -> aborting the call :-(" << std::endl; - return -1; - } - - // now we serialize the message: - grpc::string serializedRequest; - bool success = message->SerializeToString(&serializedRequest); - if(not success) - { - std::cerr << "Error: Failed to serialize method arguments" << std::endl; - return -1; - } - - call.Write(serializedRequest); - } - - // End the request stream. (This is a limitation of gWhisper streaming support, as we sequentially stream all request messages, then end the stream and then handle the reply stream.) No async streaming is possible via this CLI at the moment. - call.WritesDone(); // In a loop we read reply data from the reply stream: // NOTE: in gRPC every RPC can be considered "streaming". Non-streaming RPCs // merely return one reply message. + google::protobuf::DynamicMessageFactory dynamicFactory; bool init = true; for (init = true; call.Read(&serializedResponse, init ? &serverMetadataA : nullptr); init= false) { @@ -303,23 +338,24 @@ int call(ParsedElement & parseTree) std::cout << msgString; // Omit endline here. This is an unwanted char when binary data is directed into a file. std::cerr << std::endl; // ... but put and endline into stderr to keep the console output nice again. } + std::cout << "test her" << std::endl; } // reply stream finished -> finish the RPC: + requestThread.join(); grpc::Status status = call.Finish(&serverMetadataB); if(not status.ok()) { std::cerr << "RPC failed ;( Status code: " << std::to_string(status.error_code()) << " " << cli::getGrpcStatusCodeAsString(status.error_code()) << ", error message: " << status.error_message() << std::endl; - return -1; + result = -1; + } else { + std::cerr << "RPC succeeded :D" << std::endl; } - std::cerr << "RPC succeeded :D" << std::endl; - - return 0; + return result; } - } /// getModifier() diff --git a/src/libCli/GrammarConstruction.cpp b/src/libCli/GrammarConstruction.cpp index b849f21..99ec9ad 100644 --- a/src/libCli/GrammarConstruction.cpp +++ b/src/libCli/GrammarConstruction.cpp @@ -13,7 +13,6 @@ // limitations under the License. #include -#include #include #include @@ -23,94 +22,7 @@ using namespace ArgParse; namespace cli { -class GrammarInjectorMethodArgs : public GrammarInjector -{ - public: - GrammarInjectorMethodArgs(Grammar & f_grammar, const std::string & f_elementName = "") : - GrammarInjector("MethodArgs", f_elementName), - m_grammar(f_grammar) - { - } - - virtual ~GrammarInjectorMethodArgs() - { - } - - virtual GrammarElement * getGrammar(ParsedElement * f_parseTree, std::string & f_ErrorMessage) override - { - // FIXME: we are already completing this without a service parsed. - // this works in most cases, as it will just fail. however this is not really a nice thing. - std::string serverAddress = f_parseTree->findFirstChild("ServerAddress"); - std::string serverPort = f_parseTree->findFirstChild("ServerPort"); - std::string serviceName = f_parseTree->findFirstChild("Service"); - std::string methodName = f_parseTree->findFirstChild("Method"); - if(serverPort == "") - { - serverPort = "50051"; - } - serverAddress = serverAddress + ":" + serverPort; - - //std::cout << f_parseTree->getDebugString() << std::endl; - //std::cout << "Injecting grammar for " << serverAddress << ":" << serverPort << " " << serviceName << " " << methodName << std::endl; - std::shared_ptr channel = ConnectionManager::getInstance().getChannel(serverAddress); - - if(not waitForChannelConnected(channel, getConnectTimeoutMs(f_parseTree))) - { - f_ErrorMessage = "Error: Could not connect the Server."; - return nullptr; - } - - const grpc::protobuf::ServiceDescriptor* service = ConnectionManager::getInstance().getDescPool(serverAddress)->FindServiceByName(serviceName); - - if(service == nullptr) - { - f_ErrorMessage = "Error: Service not found."; - return nullptr; - } - - auto method = service->FindMethodByName(methodName); - if(method == nullptr) - { - f_ErrorMessage = "Error: Method not found."; - return nullptr; - } - - if(method->client_streaming()) - { - ArgParse::GrammarFactory grammarFactory(m_grammar); - - return grammarFactory.createList( - "RequestStream", - getMessageGrammar("Message", method->input_type(), m_grammar.createElement(":")), - m_grammar.createElement(), - false, - nullptr, - nullptr - ); - } - else - { - return getMessageGrammar("Message", method->input_type()); - } - //auto concat = m_grammar.createElement(); - - //auto separation = m_grammar.createElement(); - ////auto separation = m_grammar.createElement(); - ////separation->addChild(m_grammar.createElement()); - ////separation->addChild(m_grammar.createElement(",")); - //concat->addChild(separation); - - //concat->addChild(fields); - - //auto result = m_grammar.createElement("Fields"); - //result->addChild(concat); - - //return result; - }; - - private: - - void addFieldValueGrammar(GrammarElement * f_fieldGrammar, const grpc::protobuf::FieldDescriptor * f_field) + void MessageGrammarFactory::addFieldValueGrammar(GrammarElement * f_fieldGrammar, const grpc::protobuf::FieldDescriptor * f_field) { switch(f_field->cpp_type()) { @@ -233,7 +145,7 @@ class GrammarInjectorMethodArgs : public GrammarInjector // FIXME: we do want to generate a list via factory here not an alternation. // This makes life much easier and avoids duplicate code as all messages have // same parse structure. - GrammarElement * getMessageGrammar(const std::string & f_rootElementName, const grpc::protobuf::Descriptor* f_messageDescriptor, GrammarElement * f_wrappingElement = nullptr) + GrammarElement * MessageGrammarFactory::getMessageGrammar(const std::string & f_rootElementName, const grpc::protobuf::Descriptor* f_messageDescriptor, GrammarElement * f_wrappingElement) { ArgParse::GrammarFactory grammarFactory(m_grammar); auto fieldsAlt = m_grammar.createElement(); @@ -295,6 +207,94 @@ class GrammarInjectorMethodArgs : public GrammarInjector +class GrammarInjectorMethodArgs : public GrammarInjector +{ + public: + GrammarInjectorMethodArgs(Grammar & f_grammar, const std::string & f_elementName = "") : + GrammarInjector("MethodArgs", f_elementName), + m_grammar(f_grammar) + { + } + + virtual ~GrammarInjectorMethodArgs() + { + } + + virtual GrammarElement * getGrammar(ParsedElement * f_parseTree, std::string & f_ErrorMessage) override + { + // FIXME: we are already completing this without a service parsed. + // this works in most cases, as it will just fail. however this is not really a nice thing. + std::string serverAddress = f_parseTree->findFirstChild("ServerAddress"); + std::string serverPort = f_parseTree->findFirstChild("ServerPort"); + std::string serviceName = f_parseTree->findFirstChild("Service"); + std::string methodName = f_parseTree->findFirstChild("Method"); + if(serverPort == "") + { + serverPort = "50051"; + } + serverAddress = serverAddress + ":" + serverPort; + + //std::cout << f_parseTree->getDebugString() << std::endl; + //std::cout << "Injecting grammar for " << serverAddress << ":" << serverPort << " " << serviceName << " " << methodName << std::endl; + std::shared_ptr channel = ConnectionManager::getInstance().getChannel(serverAddress); + + if(not waitForChannelConnected(channel, getConnectTimeoutMs(f_parseTree))) + { + f_ErrorMessage = "Error: Could not connect the Server."; + return nullptr; + } + + const grpc::protobuf::ServiceDescriptor* service = ConnectionManager::getInstance().getDescPool(serverAddress)->FindServiceByName(serviceName); + + if(service == nullptr) + { + f_ErrorMessage = "Error: Service not found."; + return nullptr; + } + + auto method = service->FindMethodByName(methodName); + if(method == nullptr) + { + f_ErrorMessage = "Error: Method not found."; + return nullptr; + } + + MessageGrammarFactory messageFactory(m_grammar); + if(method->client_streaming()) + { + ArgParse::GrammarFactory grammarFactory(m_grammar); + + return grammarFactory.createList( + "RequestStream", + messageFactory.getMessageGrammar("Message", method->input_type(), m_grammar.createElement(":")), + m_grammar.createElement(), + false, + nullptr, + nullptr + ); + } + else + { + return messageFactory.getMessageGrammar("Message", method->input_type()); + } + //auto concat = m_grammar.createElement(); + + //auto separation = m_grammar.createElement(); + ////auto separation = m_grammar.createElement(); + ////separation->addChild(m_grammar.createElement()); + ////separation->addChild(m_grammar.createElement(",")); + //concat->addChild(separation); + + //concat->addChild(fields); + + //auto result = m_grammar.createElement("Fields"); + //result->addChild(concat); + + //return result; + }; + + private: + Grammar & m_grammar; }; @@ -499,6 +499,8 @@ GrammarElement * constructGrammar(Grammar & f_grammarPool) timeoutOption->addChild(f_grammarPool.createElement("--connectTimeoutMilliseconds=")); timeoutOption->addChild(f_grammarPool.createElement("[0-9]+", "connectTimeout")); optionsalt->addChild(timeoutOption); + optionsalt->addChild(f_grammarPool.createElement("-k", "KeepStreamAlive")); + optionsalt->addChild(f_grammarPool.createElement("--keepStreamAlive", "KeepStreamAlive")); optionsalt->addChild(customOutputFormat); // FIXME FIXME FIXME: we cannot distinguish between --complete and --completeDebug.. this is a problem for arguments too, as we cannot guarantee, that we do not have an argument starting with the name of an other argument. // -> could solve by makeing FixedString greedy diff --git a/src/libCli/GrammarConstruction.hpp b/src/libCli/GrammarConstruction.hpp index 664eb56..1a1531e 100644 --- a/src/libCli/GrammarConstruction.hpp +++ b/src/libCli/GrammarConstruction.hpp @@ -15,6 +15,7 @@ #pragma once #include +#include namespace cli { @@ -23,4 +24,16 @@ namespace cli /// @returns the root element of the generated grammar. The pointer should not /// be used after the given f_grammarPool is de-allocated. ArgParse::GrammarElement * constructGrammar(ArgParse::Grammar & f_grammarPool); + class MessageGrammarFactory{ + public: + MessageGrammarFactory(ArgParse::Grammar& grammar): + m_grammar(grammar){ + } + ArgParse::GrammarElement * getMessageGrammar(const std::string & f_rootElementName, const grpc::protobuf::Descriptor* f_messageDescriptor, ArgParse::GrammarElement * f_wrappingElement = nullptr); + + private: + void addFieldValueGrammar(ArgParse::GrammarElement * f_fieldGrammar, const grpc::protobuf::FieldDescriptor * f_field); + ArgParse::Grammar & m_grammar; + }; + } From d3e4522469660f40d47a7acc026ca32bef59232b Mon Sep 17 00:00:00 2001 From: wenwen Date: Fri, 16 Aug 2019 01:01:07 +0200 Subject: [PATCH 2/2] fix a test but still problem Signed-off-by: wenwen --- src/libCli/Call.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/src/libCli/Call.cpp b/src/libCli/Call.cpp index 2c45a40..c57ae0d 100644 --- a/src/libCli/Call.cpp +++ b/src/libCli/Call.cpp @@ -338,7 +338,6 @@ int call(ParsedElement & parseTree) std::cout << msgString; // Omit endline here. This is an unwanted char when binary data is directed into a file. std::cerr << std::endl; // ... but put and endline into stderr to keep the console output nice again. } - std::cout << "test her" << std::endl; } // reply stream finished -> finish the RPC: