- Print
- PDF
Introduction
The aim of this document is to explain gRPC integration details for Data Flow.
This guide expects the reader to have prior knowledge on gRPC development. Please refer to gRPC documentation for detailed technical information about gRPC itself.
Preparation
Before starting, you are expected to prepare your stub classes.
Please generate your stub classes as explained here.
You can download Data Flow proto file here.
Example
You are advised to use the gRPC Async API in your application for production purposes. gRPC synchronous API is easier to follow in a step by step fashion so we will be using it in this tutorial.
Data Flow gRPC API has a similar flow as the Websocket API. Basically you start a stream operation, send your data in small chunks and listen to responses as they are produced.
First you need to create a channel object with appropriate CA root certificates. And make a new stub with it.
grpc::SslCredentialsOptions credentialOptions;
credentialOptions.pem_root_certs =
sestek::io::ReadFileContentOrThrow<std::string>(((std::string)config["cacerts"]).c_str());
auto channel_creds = grpc::SslCredentials(credentialOptions);
channel = grpc::CreateChannel("dataflow.eu.knovvu.com:443", channel_creds);
auto _stub = dataflow::NewStub(_channel);
You start a streaming operation by calling the rpc method Stream
from your application.
service dataflow
{
rpc Stream(stream StreamInput) returns (stream StreamOutput) {}
}
For C++ this means the following call:
grpc::ClientContext context;
auto stream = _stub->Stream(&context);
To start a wagtail session with this Stream you shall first prepare a start message in json format:
nlohmann::json startJson;
nlohmann::json audioJson;
audioJson["sample-rate"] = 8000;
audioJson["channel-count"] = 1; // or 2 (stereo is also supported)
audioJson["encoding"] = "pcm"; //or ulaw
startJson["audio"] = std::move(audioJson);
startJson["message-name"] = "start";
startJson["tenant"] = "3a0d7ae9-0ae8-5c93-cb8d-897fbabb2877";
startJson["token"] = "........."; //A token retrieved from knovvu core identity service with your credentials
startJson["project-name"] = "stream-sr-ca";
Your tenant information shall be provided to you. As of today you can also check it from the projects page.
Above the "New Project" button
You should put this json formatted start
message string into a StreamInput
message and send it to Dataflow.
StreamInput request;
auto & message = *request.mutable_message();
message.set_content(startJson.dump());
message.set_content_type(CONTENT_TYPE::REQUEST);
grpc::WriteOptions wOpts;
bool started = stream->Write(request, wOpts);
Dataflow server will respond with a StreamOutput
message.
StreamOutput response;
stream->Read(&response);
std::cout << "response : " << response.SerializeAsString() << std::endl;
The response received here has the json format. It contains a field named operation-result
which will have a value success
if everything is fine or an error when something is wrong.
Assuming Data Flow server has returned success
you can start streaming your audio. But since we are using the synchronous API, let's start a thread before streaming our audio so that we continuously read any event sent to our application from Data Flow service.
std::thread th([&stream]()
{
for (;;)
{
StreamOutput response;
stream->Read(&response);
response.PrintDebugString();
}
});
....
th.join(); //note : do not forget to join or detach the thread in C++ when you are done.
Now you can start sending the audio fragments, for example you can send audio data in 20 milliseconds intervals. Note that if you have stereo recordings you should send audio of each channel as interleaved samples, ie. similar to how audio is stored/streamed with wav files and any similar media.
std::string data;
...//fill data with your audio
StreamInput request;
auto & message = *request.mutable_message();
message.set_binary_content(data);
message.set_content_type(CONTENT_TYPE::AUDIO);
grpc::WriteOptions wOpts;
stream->Write(request, wOpts);
So that's the gist of it.
Remarks
Content Type
The user application is responsible for setting the appropriate content type for each gRPC message it sends.
Data Flow server sets the content type appropriately for every message it sends to the user. The user should always check the content type of a received message before taking any action about the message.
CONTENT_TYPE can take the following values.
enum CONTENT_TYPE{
AUDIO = 0;
EVENT = 1;
REQUEST = 2;
RESPONSE = 3;
}
Message Types
In Data Flow for every valid request there will be a response. eg. start
message has a corresponding start-response
message.
Events can be generated by the Data Flow at any moment. Data Flow has a flexible project system that uses nodes in its design. Each node can produce events whenever/wherever and custom nodes can also be introduced by the user. Please see message definitions for details.
Notes
- The example codes use a third-party library named nlohmann for parsing and producing json formatted strings. It is a nice library but it's use is not mandatory. You may use any json library you prefer.