Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions src/libArgParse/GrammarElement.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,6 @@

namespace ArgParse
{
// a Graph
class GrammarElement;

class GrammarElement
{
public:
Expand Down
3 changes: 2 additions & 1 deletion src/libCli/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.

cmake_minimum_required (VERSION 2.8)

find_package(Threads REQUIRED)
set(TARGET_NAME "cli")
set(TARGET_SRC
./MessageParsing.cpp
Expand All @@ -27,6 +27,7 @@ add_library(${TARGET_NAME} ${TARGET_SRC})
target_link_libraries ( ${TARGET_NAME}
reflection
ArgParse
${CMAKE_THREAD_LIBS_INIT}
)

if(BUILD_CONFIG_USE_BOOST_REGEX)
Expand Down
139 changes: 87 additions & 52 deletions src/libCli/Call.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
#include <libCli/OutputFormatting.hpp>
#include <libCli/ConnectionManager.hpp>
#include <libCli/MessageParsing.hpp>
#include <libCli/GrammarConstruction.hpp>
#include <string>
#include <chrono>
#include <thread>
#include <ctime>
#include <iomanip>

Expand Down Expand Up @@ -151,8 +154,85 @@ std::string getTimeString()
return cstr ;
}

int sendProtoMessage(ArgParse::ParsedElement * messageParseTree, grpc::testing::CliCall& call, const grpc::protobuf::Descriptor* inputType, bool printParsedMessage){
// now we have to construct a protobuf from the parsed argument, which corresponds to the inputType
google::protobuf::DynamicMessageFactory dynamicFactory;
// read data from the parse tree into the protobuf message:
std::unique_ptr<grpc::protobuf::Message> message = cli::parseMessage(*messageParseTree, dynamicFactory, inputType);

if(printParsedMessage)
{
// use built-in human readable output format
cli::OutputFormatter imessageFormatter;
std::cout << "Request message:" << std::endl << imessageFormatter.messageToString(*message, inputType, "| ", "| " ) << std::endl;
}

if(not message)
{
std::cerr << "Error: Error parsing method arguments -> aborting the call :-(" << std::endl;
return -1;
}

// now we serialize the message:
grpc::string serializedRequest;
bool success = message->SerializeToString(&serializedRequest);
if(not success)
{
std::cerr << "Error: Failed to serialize method arguments" << std::endl;
return -1;
}

call.Write(serializedRequest);
return 0;

}
void sendMessages(ParsedElement & parseTree, grpc::testing::CliCall& call, const google::protobuf::MethodDescriptor * method, bool& result){
std::vector<ArgParse::ParsedElement*> requestMessages;
// search all passed messages: (true flag prevents searching sub-messages)
parseTree.findAllSubTrees("Message", requestMessages, true);

const grpc::protobuf::Descriptor* inputType = method->input_type();

if(not method->client_streaming() and requestMessages.size() == 0)
{
// User did not give any message arguments for non-streaming RPC
// In this case we just add the parseTree, which causes a default message to be cunstructed:
requestMessages.push_back(&parseTree);
}

bool secondRun = false;
bool printParsedMessage = parseTree.findFirstChild("PrintParsedMessage") != "";
for(ArgParse::ParsedElement * messageParseTree : requestMessages)
{
result = sendProtoMessage(messageParseTree, call, inputType, printParsedMessage);
}
bool keepAlive = parseTree.findFirstChild("KeepStreamAlive") != "";
if(keepAlive){
Grammar newGrammarPool;
MessageGrammarFactory messageFactory(newGrammarPool);
auto newGrammar = messageFactory.getMessageGrammar("Message", method->input_type());
while(true)
{
std::string newMessage;
std::cin >> newMessage;
std::cout << "get Message " << newMessage << std::endl;
ParsedElement newTree;
ParseRc rc = newGrammar->parse(newMessage.c_str(), newTree);
if(rc.isGood()){
result = sendProtoMessage(&newTree, call, inputType, printParsedMessage);
}
}

}

// End the request stream. (This is a limitation of gWhisper streaming support, as we sequentially stream all request messages, then end the stream and then handle the reply stream.) No async streaming is possible via this CLI at the moment.
call.WritesDone();

}

int call(ParsedElement & parseTree)
{
bool result = 0;
std::string serverAddress = parseTree.findFirstChild("ServerAddress");
std::string serverPort = parseTree.findFirstChild("ServerPort");
std::string serviceName = parseTree.findFirstChild("Service");
Expand Down Expand Up @@ -188,22 +268,6 @@ int call(ParsedElement & parseTree)
return -1;
}

const grpc::protobuf::Descriptor* inputType = method->input_type();

// now we have to construct a protobuf from the parsed argument, which corresponds to the inputType
google::protobuf::DynamicMessageFactory dynamicFactory;

std::vector<ArgParse::ParsedElement*> requestMessages;
// search all passed messages: (true flag prevents searching sub-messages)
parseTree.findAllSubTrees("Message", requestMessages, true);

if(not method->client_streaming() and requestMessages.size() == 0)
{
// User did not give any message arguments for non-streaming RPC
// In this case we just add the parseTree, which causes a default message to be cunstructed:
requestMessages.push_back(&parseTree);
}

// Prepare the RPC call:
std::multimap<grpc::string, grpc::string> clientMetadata;
grpc::string serializedResponse;
Expand All @@ -214,42 +278,13 @@ int call(ParsedElement & parseTree)
grpc::testing::CliCall call(channel, methodStr, clientMetadata);

// Write all request messages (multiple in case of request stream)
for(ArgParse::ParsedElement * messageParseTree : requestMessages)
{
// read data from the parse tree into the protobuf message:
std::unique_ptr<grpc::protobuf::Message> message = cli::parseMessage(*messageParseTree, dynamicFactory, inputType);

if(parseTree.findFirstChild("PrintParsedMessage") != "")
{
// use built-in human readable output format
cli::OutputFormatter imessageFormatter;
std::cout << "Request message:" << std::endl << imessageFormatter.messageToString(*message, method->input_type(), "| ", "| " ) << std::endl;
}
std::thread requestThread(sendMessages, std::ref(parseTree), std::ref(call), std::ref(method), std::ref(result));

if(not message)
{
std::cerr << "Error: Error parsing method arguments -> aborting the call :-(" << std::endl;
return -1;
}

// now we serialize the message:
grpc::string serializedRequest;
bool success = message->SerializeToString(&serializedRequest);
if(not success)
{
std::cerr << "Error: Failed to serialize method arguments" << std::endl;
return -1;
}

call.Write(serializedRequest);
}

// End the request stream. (This is a limitation of gWhisper streaming support, as we sequentially stream all request messages, then end the stream and then handle the reply stream.) No async streaming is possible via this CLI at the moment.
call.WritesDone();

// In a loop we read reply data from the reply stream:
// NOTE: in gRPC every RPC can be considered "streaming". Non-streaming RPCs
// merely return one reply message.
google::protobuf::DynamicMessageFactory dynamicFactory;
bool init = true;
for (init = true; call.Read(&serializedResponse, init ? &serverMetadataA : nullptr); init= false)
{
Expand Down Expand Up @@ -306,20 +341,20 @@ int call(ParsedElement & parseTree)
}

// reply stream finished -> finish the RPC:
requestThread.join();
grpc::Status status = call.Finish(&serverMetadataB);

if(not status.ok())
{
std::cerr << "RPC failed ;( Status code: " << std::to_string(status.error_code()) << " " << cli::getGrpcStatusCodeAsString(status.error_code()) << ", error message: " << status.error_message() << std::endl;
return -1;
result = -1;
} else {
std::cerr << "RPC succeeded :D" << std::endl;
}

std::cerr << "RPC succeeded :D" << std::endl;


return 0;
return result;
}

}

/// getModifier()
Expand Down
Loading