gRPC
  • 17 Nov 2023
  • 3 Minutes to read
  • Contributors
  • PDF

gRPC

  • PDF

Article summary

Introduction

The aim of this document is to explain gRPC integration details for Data Flow.

This guide expects the reader to have prior knowledge on gRPC development. Please refer to gRPC documentation for detailed technical information about gRPC itself.

Preparation

Before starting, you are expected to prepare your stub classes.
Please generate your stub classes as explained here.
You can download Data Flow proto file here.

grpcDataFlow.proto

Example

You are advised to use the gRPC Async API in your application for production purposes. gRPC synchronous API is easier to follow in a step by step fashion so we will be using it in this tutorial.

Data Flow gRPC API has a similar flow as the Websocket API. Basically you start a stream operation, send your data in small chunks and listen to responses as they are produced.

First you need to create a channel object with appropriate CA root certificates. And make a new stub with it.

grpc::SslCredentialsOptions credentialOptions;
credentialOptions.pem_root_certs = 
  sestek::io::ReadFileContentOrThrow<std::string>(((std::string)config["cacerts"]).c_str());
auto channel_creds = grpc::SslCredentials(credentialOptions);
channel = grpc::CreateChannel("dataflow.eu.knovvu.com:443", channel_creds);

auto _stub = dataflow::NewStub(_channel);

You start a streaming operation by calling the rpc method Stream from your application.

service dataflow
{
  rpc Stream(stream StreamInput) returns (stream StreamOutput) {}  
}

For C++ this means the following call:

grpc::ClientContext context;
auto stream = _stub->Stream(&context);

To start a wagtail session with this Stream you shall first prepare a start message in json format:

nlohmann::json startJson;
nlohmann::json audioJson;
audioJson["sample-rate"] = 8000;
audioJson["channel-count"] = 1; // or 2 (stereo is also supported)
audioJson["encoding"] = "pcm"; //or ulaw 
startJson["audio"] = std::move(audioJson);

startJson["message-name"] = "start";
startJson["tenant"] = "3a0d7ae9-0ae8-5c93-cb8d-897fbabb2877"; 
startJson["token"] = "........."; //A token retrieved from knovvu core identity service with your credentials
startJson["project-name"] = "stream-sr-ca";

Your tenant information shall be provided to you. As of today you can also check it from the projects page.

image.png

Above the "New Project" button
image.png

You should put this json formatted start message string into a StreamInput message and send it to Dataflow.

StreamInput request;
auto & message = *request.mutable_message();		
message.set_content(startJson.dump());
message.set_content_type(CONTENT_TYPE::REQUEST);

grpc::WriteOptions wOpts;
bool started = stream->Write(request, wOpts);

Dataflow server will respond with a StreamOutput message.

StreamOutput response;
stream->Read(&response);
std::cout << "response : " << response.SerializeAsString() << std::endl;

The response received here has the json format. It contains a field named operation-result which will have a value success if everything is fine or an error when something is wrong.

Assuming Data Flow server has returned success you can start streaming your audio. But since we are using the synchronous API, let's start a thread before streaming our audio so that we continuously read any event sent to our application from Data Flow service.

std::thread th([&stream]()
{
	for (;;)
	{
		StreamOutput response;
		stream->Read(&response);
		response.PrintDebugString();
	}
});

....

th.join(); //note : do not forget to join or detach the thread in C++ when you are done.

Now you can start sending the audio fragments, for example you can send audio data in 20 milliseconds intervals. Note that if you have stereo recordings you should send audio of each channel as interleaved samples, ie. similar to how audio is stored/streamed with wav files and any similar media.

std::string data;

...//fill data with your audio

StreamInput request;
auto & message = *request.mutable_message();
message.set_binary_content(data);
message.set_content_type(CONTENT_TYPE::AUDIO);
grpc::WriteOptions wOpts;
stream->Write(request, wOpts);

So that's the gist of it.

Remarks

Content Type
The user application is responsible for setting the appropriate content type for each gRPC message it sends.
Data Flow server sets the content type appropriately for every message it sends to the user. The user should always check the content type of a received message before taking any action about the message.

CONTENT_TYPE can take the following values.

enum CONTENT_TYPE{
  AUDIO = 0;
  EVENT = 1;
  REQUEST = 2;
  RESPONSE = 3;
}

Message Types

In Data Flow for every valid request there will be a response. eg. start message has a corresponding start-response message.

Events can be generated by the Data Flow at any moment. Data Flow has a flexible project system that uses nodes in its design. Each node can produce events whenever/wherever and custom nodes can also be introduced by the user. Please see message definitions for details.

Notes

  • The example codes use a third-party library named nlohmann for parsing and producing json formatted strings. It is a nice library but it's use is not mandatory. You may use any json library you prefer.

Was this article helpful?

What's Next
Changing your password will log you out immediately. Use the new password to log back in.
First name must have atleast 2 characters. Numbers and special characters are not allowed.
Last name must have atleast 1 characters. Numbers and special characters are not allowed.
Enter a valid email
Enter a valid password
Your profile has been successfully updated.