Cache recent data

The hot data store enables faster queries on recent sensor data.

Configure

To configure a rolling window of recent data, add the recent_data_store configuration to your component’s data capture settings:

{
  "components": [
    {
      "name": "sensor-1",
      "api": "rdk:component:sensor",
      "model": "rdk:builtin:fake",
      "attributes": {},
      "service_configs": [
        {
          "type": "data_manager",
          "attributes": {
            "capture_methods": [
              {
                "method": "Readings",
                "capture_frequency_hz": 0.5,
                "additional_params": {},
                "recent_data_store": {
                  "stored_hours": 24
                }
              }
            ]
          }
        }
      ]
    }
  ]
}

Set the value of the stored_hours field to the number of hours of recent data you would like to cache in the hot data store.

Query

Queries typically execute on blob storage. To query data from hot data store instead of blob storage, specify hot storage as your data source in your query.

Use DataClient.TabularDataByMQL with data_source set to TabularDataSourceType.TABULAR_DATA_SOURCE_TYPE_HOT_STORAGE to query your hot data store:

data_client = DataClient.from_api_key(
    api_key="<api-key>",
    api_key_id="<api-key-id>"
)

results = data_client.tabular_data_by_mql(
    organization_id="<org-id>",
    mql_binary=[
        bson.encode({"$match": {"location_id": "warehouse-1"}}),
        bson.encode({"$sort": {"_id": -1}}),
        bson.encode({"$limit": 100})
    ],
    data_source=TabularDataSourceType.TABULAR_DATA_SOURCE_TYPE_HOT_STORAGE
)

for document in results:
    print(document)

Use DataClient.TabularDataByMQL with DataSource set to datapb.TabularDataSourceType_TABULAR_DATA_SOURCE_TYPE_HOT_STORAGE to query your hot data store:

client, err := utils.NewViamClient(context.Background(), utils.WithAPIKey("<api-key>", "<api-key-id>"))
if err != nil {
    panic(err)
}
defer client.Close()

dataClient := client.DataClient()

query := [][]byte{
    bson.Marshal(bson.M{"$match": bson.M{"location_id": "warehouse-1"}}),
    bson.Marshal(bson.M{"$sort": bson.M{"_id": -1}}),
    bson.Marshal(bson.M{"$limit": 100}),
}

resp, err := dataClient.TabularDataByMQL(context.Background(), &datapb.TabularDataByMQLRequest{
    OrganizationId: "<org-id>",
    MqlBinary: query,
    DataSource: datapb.TabularDataSourceType_TABULAR_DATA_SOURCE_TYPE_HOT_STORAGE,
})

for _, doc := range resp.Data {
    fmt.Println(doc)
}

Use dataClient.TabularDataByMQL with dataSource set to TabularDataSourceType.TABULAR_DATA_SOURCE_TYPE_HOT_STORAGE to query your hot data store:

const apiKey = "<api-key>";
const apiKeyID = "<api-key-id>";

const client = await createViamClient({
  credential: {
    type: "api-key",
    payload: { key: apiKey, keyId: apiKeyID },
  },
});

const dataClient = client.dataClient;

const query = [
  BSON.serialize({ $match: { location_id: "warehouse-1" } }),
  BSON.serialize({ $sort: { _id: -1 } }),
  BSON.serialize({ $limit: 100 }),
];

const response = await dataClient.tabularDataByMQL({
  organizationId: "<org-id>",
  mqlBinary: query,
  dataSource: TabularDataSourceType.TABULAR_DATA_SOURCE_TYPE_HOT_STORAGE,
});

response.data.forEach((doc) => {
  console.log(BSON.deserialize(doc));
});

All queries that omit DataSource will continue to use blob storage.

Query limitations

Queries to the hot data store support the following MongoDB aggregation operators:

  • $addFields
  • $bucket
  • $bucketAuto
  • $count
  • $densify
  • $fill
  • $geoNear
  • $group
  • $limit
  • $match
  • $project
  • $redact
  • $replaceRoot
  • $replaceWith
  • $sample
  • $set
  • $setWindowFields
  • $skip
  • $sort
  • $sortByCount
  • $unset
  • $unwind