MQTT API Resources (Beta)


This is a list of GroveStreams' topics that devices may subscribe or publish to.

Topics Description


Feeds
PUBLISH {orgUid}/feed Uploads a feed for one or more components.
PUBLISH {orgUid}/feed/cid/{compId} Uploads a feed for one component.
PUBLISH {orgUid}/feed/cid/{compId}/sid/{streamId} Uploads a feed for one component stream.
PUBLISH {orgUid}/feed/cid/{compId}/sid/{streamId}/data/{data} Uploads one sample for one component stream. Uses server time for the sample time.
PUBLISH {orgUid}/feed/cid/{compId}/sid/{streamId}/data/{data}/time/{time} Uploads one sample for one component stream while using {time} as the sample time.


Management
PUBLISH {orgUid}/manage/cert/register/cid/{compId} Uploads a new certificate chain for a device.
PUBLISH {orgUid}/manage/cert/unregister/cid/{compId} Removes a device certificate from the organization.
SUBSCRIBE {orgUid}/manage/cert/roll/cid/{compId} Notifies a device to roll its certificates.
SUBSCRIBE {orgUid}/manage/cert/update_truststore/cid/{compId} Notifies a device to update its truststore with the latest GroveStreams certificate.
PUBLISH manage/cert/myorg/cid/{compId} Replies with the org uid that the current certificate is registered within.


HTTP API Calls
PUBLISH {orgUid}/api/http/{apiUrl} Makes an HTTP API call and may publish a reply.


Overview

  1. 1) All calls require an X.509 Certificate to be registered within the GroveStreams organization and assigned a policy. The entire certificate chain needs to be registered. More on certificates can be found here.
  2. 2) All calls are rate limited.
  3. 3) IDs and UIDs
  4. 4) Date and Times

Compression

Payload compression is supported for these calls. Set a user property on the message called "enc" with the value of the compression encoding. See the Raspberry Pi tutorial for an example of using Feed, and other, topics with compression.
Encodings supported:
  • BR: BROTLI
  • BZIP2
  • DEFLATE
  • DEFLATE64
  • GZ: GZIP
  • LZ4-BLOCK
  • LZ4-FRAMED
  • LZMA
  • PACK200
  • SNAPPY-FRAMED
  • SNAPPY-RAW
  • XZ
  • Z
  • ZSTD: ZSTANDARD

ReplyTo

Some calls support ReplyTo (or also known as the Request-Response pattern). ReplyTo is an MQTT concept to emulate request/response. To receive a reply:
  1. 1) Set the correlation data.
  2. 2) Set the response topic.
  3. 3) Optionally set the user property "replyqos"
  4. 4) Optionally set the user property "replyenc" to set the compression encoding if the reply is to be compressed.
See the Using Request-Response pattern of the Raspberry Pi tutorial for more information and examples.


PUBLISH {orgUid}/feed:

PUBLISH {orgUid}/feed/cid/{compId}

PUBLISH {orgUid}/feed/cid/{compId}/sid/{streamId}

PUBLISH {orgUid}/feed/cid/{compId}/sid/{streamId}/data/{data}

PUBLISH {orgUid}/feed/cid/{compId}/sid/{streamId}/data/{data}/time/{time}



GroveStreams subscribes to these topics and saves the data to streams within an organization. GroveStreams' data processing for these calls is asynchronous. The connection will be dropped if an error occurs during the saving process.

Topic and Payload Design Pattern

All of these topics accept the same message payload. Including the compId and streamId, as part of the topic, helps to minimize the size of the message body and allows for subscriptions specific to a component, stream and data.

The payload data values will be used if compId, streamId, data, or time is included in both the topic and in the body.

This design minimizes publish call sizes to avoid I/O costs.

Expression Derivation

An attempt to derive all precedent streams dependent on the streams being uploaded, will occur after the data has been saved but while the components are still locked.

All streams impacted will be derived while walking up the derivation dependency tree. Derivation will stop when a maximum of 20 streams is reached or the total save time reaches 10 seconds. Derivation will stop working up a precedent branch if a component is locked by another process, derivation fails due to exceptions, derivation did not derive any new intervals. Derivation will only occur for streams that are appending samples.

Normal derivation still occurs within the Job framework on Job servers and will attempt to derive all streams that need deriving every few minutes.

Message Payload JSON Options

{
  "op": {   
    "cname": "Temperature",
    "folder": "/Components/zone1",
    "ctid": "temp_template", 
    "dtid": "xx",
    "dsid": "xx",
    "autofillGapsWithNulls": true, 
    "freq": 10000, 
    “t” : 1380225121172 
  },
  "data": [
    {
      "cid": "comp2",  
      "sid": "status", 
      "d": "normal",
      "t": 1380225121172 
      "te": 1380225121172 
    }
  ]
}					

op

Op stands for Options that can be applied during the call. This section is optional.

cname
optional
The name to assign to a new component that is auto-created during the first call. The name is only set once during the creation of the component.
folder
optional
The folder to place a newly created component into. Must start with /Components. The folder path will be created if one does not exist.
ctid
optional
The ID of a component template that will be used to auto-create a component for this feed if one does not already exist. The component ID will be used for the component name if cname is not set.
dtId
dsId
optional
These options are used to dynamically create streams for a component if they do not exist based on a component template stream. The stream definition will be based on a component template stream definition. These options are useful for times when the default new stream is not what is desired. For example, a caller may require new streams to be interval streams or streams with units already defined. These parameters can accomplish this.
dtId: The ID of the component template that contains the stream to be used as the template for the new stream.
dsId: The ID of the stream within the component template to be used as the template for the new stream.
freq optional
A single number (not a delimited set of numbers). The freq is the frequency, in milliseconds, that the samples were taken in. A GroveStreams server will use the freq and the current server time to determine the actual time stamp of each sample. Data tuples are listed earliest first.
t optional
Sample time used for each data tuple in epoch millis. Only applied if missing from the tuple. Saves space.
autofillGapsWithNulls
optional
Defaults to true. This option is only used if uploading into an interval stream and the samples being uploaded are missing interval sample times within the timespan being uploaded. When true, Nulls are inserted and overwrite the values in the store for the missing interval times in the upload. Setting this parameter to false will preserve the store values for the missing uploaded samples.

data

A collection of tuples containing sample times. This section is optional if it is provided as part of the topic.

cid
optional if missing from topic
The ID of the Component that contains the streams for the new samples. The compId value will be assigned to a the new component if the compTmplId parameter is included.
sid
optional if missing from topic
The ID of the stream for the samples.
d
optional if missing from topic
Data. Can be formatted to correlate with the stream data type defined as part of the component definition.
t optional if missing from topic
The sample time in epoch millis.
te optional if missing from topic
Elapsed time in epoch millis. Elapsed time is calculated using server now time minus te.

Compression


Payload compression supported - See compression above.

Examples


PUBLISH {orgUid}/feed

Payload:

{
  "data": [
    {
      "cid": "comp1",
      "sid": "status",
      "d": "normal",
      "t": 1380225121172 
    },
    {
      "cid": "comp2",
      "sid": "temp",
      "d": 38.7,
      "t": 1380225121172 
    },
    {
      "cid": "comp2",
      "sid": "windir",
      "d": "ne",
      "t": 1380225121172 
    }
  ]
}

PUBLISH {orgUid}/feed

Payload:

{
  "op": {   
    “t” : 1380225121172 
  },
  "data": [
    {
      "cid": "comp1",
      "sid": "status",
      "d": "normal"
    },
    {
      "cid": "comp2",
      "sid": "temp",
      "d": 38.7
    },
    {
      "cid": "comp2",
      "sid": "windir",
      "d": "ne"
    }
  ]
}

PUBLISH {orgUid}/feed/cid/comp2

Payload:

{
  "data": [
    {
      "sid": "temp",
      "d": 38.7,
      "t": 1380225121172 
    },
    {
      "sid": "windir",
      "d": "ne",
      "t": 1380225121172 
    }
  ]
}

PUBLISH {orgUid}/feed/cid/comp2/sid/temp

Payload:

{
  "data": [
    {
      "d": 38.7,
      "t": 1380225121172 
    }
  ]
}

PUBLISH {orgUid}/feed/cid/comp2/sid/windir/data/ne/time/1380225121172

Payload:

none



PUBLISH {orgUid}/manage/cert/register/cid/{compId}:



This call is used to upload and register new certificates for a device in an organization. The current certificates (curCerts) being created must be included in the payload and be a part of the chain used for the current SSL handshake. The new certificates will be registered within the organization in the same folder and with the same settings as the existing certificates. Component pinning will be validated if enabled. The new certificate subject DN must be different than the existing subject DN.

Supports ReplyTo. Returns any exceptions or the org uid upon success.

Message Payload JSON

{
  "curCerts": [X509Certificate string, X509Certificate string],
  "newCerts": [X509Certificate string, X509Certificate string]
}					



PUBLISH {orgUid}/manage/cert/unregister/cid/{compId}:



This call is used to unregister and remove certificates from an organization. The certificates being removed (certs) must be included in the payload and be a part of the chain used for the current SSL handshake. Component pinning will be validated if enabled.

Supports ReplyTo. Returns any exceptions.

Message Payload JSON

{
  "certs": [{X509Certificate string}, {X509Certificate string}]
}						



SUBSCRIBE {orgUid}/manage/cert/roll/cid/{compId}:



This topic is published by Certificate Operations within an organization. Devices can subscribe to handle rolling of certificates.

Message Payload

none				


SUBSCRIBE {orgUid}/manage/cert/update_truststore/cid/{compId}:



This topic is published by Certificate Operations within an organization. Devices can subscribe to handle updating their trust store.

Message Payload

none				


PUBLISH manage/cert/myorg/cid/{compId}:



Replies with the org uid that the current certificate is registered within.

Supports ReplyTo. Returns any exceptions.

Message Payload

none					



PUBLISH {orgUid}/api/http/{apiUrl}:



Makes HTTP API and Advanced HTTP API calls.

Setup ReplyTo to have http entity responses published.

Security

The MQTT call will adhere to all GroveStreams MQTT call security checks including checking the topic with the certificate policy. The HTTP API call will run under the certificate's "Run as User for HTTP API Calls" selected user for determining capabilities and content access rights.

Compression

Publish payload compression supported - See compression above.
Response payload compression supported - See replyTo above.

Billing

The MQTT HTTP API call will be converted to a pure HTTP API call and forwarded to a GroveStreams web server. I/O billing costs will be calculated as the MQTT I/O plus the web server I/O costs.

Steps to Make an HTTP API Call to return feed samples:

  1. 1) Include the URL of the API call as the last part of the API topic: {orgUid}/api/http/feed
  2. 2) Set the response topic.
  3. 3) Optionally set the user property "replyqos"
  4. 4) Optionally set the user property "replyenc" to set the compression encoding if the reply is to be compressed.

Examples

See the Raspberry Pi tutorial for an example of retrieving stream samples (mySettings) including downloading a file (provision).