Previous
Visualize data
Data pipelines automatically transform raw sensor readings into summaries and insights at a schedule that you choose. Viam stores the output of these pipelines in a cache so that you can access complex aggregation results more efficiently. When late-arriving data syncs to Viam, pipelines automatically re-run to keep summaries accurate.
For example, you could use a data pipeline to pre-calculate the average temperature across multiple sensors for each hour of the day. If you query that information frequently, this can save significant computational resources.
Before creating a data pipeline, you must enable data capture from at least one component and begin syncing data with Viam.
Only users with organization owner permissions can create a data pipeline.
To define a data pipeline, specify a name, organization, schedule, data source type, and query:
Use datapipelines create
to create your pipeline:
viam datapipelines create \
--org-id=<org-id> \
--name=sensor-counts \
--schedule="0 * * * *" \
--data-source-type="standard" \
--mql='[{"$match": {"component_name": "sensor"}}, {"$group": {"location": "$location_id", "avg": {"$avg": "$data.readings.value"}}}]'
To pass your query as a file instead of specifying it as inline MQL, pass the --mql-path
flag instead of --mql
.
To create a pipeline that reads data from the hot data store, specify --data-source-type hotstorage
.
To define a new pipeline, call DataClient.CreateDataPipeline
.
data_client = DataClient.from_api_key(
api_key="<api-key>",
api_key_id="<api-key-id>"
)
request = data_client.create_data_pipeline(
organization_id="<org-id>",
name="hourly-temp-average",
mql_binary=[
bson.encode({"$match": {"component_name": "temperature-sensor"}}),
bson.encode({
"$group": {
"location": "$location_id",
"avg_temp": {"$avg": "$data.readings.temperature"},
"count": {"$sum": 1}
}
})
],
schedule="0 * * * *" # Run hourly
)
To create a pipeline that reads data from the hot data store, set your query’s data_source
to TabularDataSourceType.TABULAR_DATA_SOURCE_TYPE_HOT_STORAGE
.
To define a new pipeline, call DataClient.CreateDataPipeline
:
client, err := utils.NewViamClient(context.Background(), utils.WithAPIKey("<api-key>", "<api-key-id>"))
if err != nil {
panic(err)
}
defer client.Close()
dataClient := client.DataClient()
pipeline := [][]byte{
bson.Marshal(bson.M{"$match": bson.M{"component_name": "temperature-sensor"}}),
bson.Marshal(bson.M{
"$group": bson.M{
"location": "$location_id",
"avg_temp": bson.M{"$avg": "$data.readings.temperature"},
"count": bson.M{"$sum": 1},
},
}),
}
resp, err := dataClient.CreateDataPipeline(context.Background(), &datapb.CreateDataPipelineRequest{
OrganizationId: "<org-id>",
Name: "hourly-temp-average",
MqlBinary: pipeline,
Schedule: "0 * * * *", // Run hourly
})
To create a pipeline that reads data from the hot data store, set your query’s data_source
field to TabularDataSourceType.TABULAR_DATA_SOURCE_TYPE_HOT_STORAGE
.
To define a new pipeline, call dataClient.CreateDataPipeline
:
const apiKey = "<api-key>";
const apiKeyID = "<api-key-id>";
const client = await createViamClient({
credential: {
type: "api-key",
payload: { key: apiKey, keyId: apiKeyID },
},
});
const dataClient = client.dataClient;
const pipeline = [
BSON.serialize({ $match: { component_name: "temperature-sensor" } }),
BSON.serialize({
$group: {
location: "$location_id",
avg_temp: { $avg: "$data.readings.temperature" },
count: { $sum: 1 },
},
}),
];
const response = await dataClient.createDataPipeline({
organizationId: "<org-id>",
name: "hourly-temp-average",
mqlBinary: pipeline,
schedule: "0 * * * *", // Run hourly
});
To create a pipeline that reads data from the hot data store, set your query’s dataSource
field to TabularDataSourceType.TABULAR_DATA_SOURCE_TYPE_HOT_STORAGE
.
Avoid specifying an _id
value in your pipeline’s final group stage unless you can guarantee its uniqueness across all pipeline runs.
Non-unique IDs will trigger duplicate key errors, preventing the pipeline from saving subsequent results.
To create a schedule for your pipeline, specify a cron expression in UTC. The schedule determines both execution frequency and the range of time queried by each execution. The following table contains some common schedules, which correspond to the listed execution frequencies and query time range:
Schedule | Frequency | Query Time Range |
---|---|---|
0 * * * * | Hourly | Previous hour |
0 0 * * * | Daily | Previous day |
*/15 * * * * | Every 15 minutes | Previous 15 minutes |
To query the results of your data pipeline, call DataClient.TabularDataByMQL
.
Configure the data_source
argument with the following fields:
type
of TabularDataSourceType.TABULAR_DATA_SOURCE_TYPE_PIPELINE_SINK
pipeline_id
of your pipeline’s ID
data_client = DataClient.from_api_key(
api_key="<api-key>",
api_key_id="<api-key-id>"
)
results = data_client.tabular_data_by_mql(
organization_id="<org-id>",
mql_binary=[
bson.encode({}),
],
data_source=TabularDataSource(
type=TabularDataSourceType.TABULAR_DATA_SOURCE_TYPE_PIPELINE_SINK,
pipeline_id="<pipeline-id>"
)
)
for document in results:
print(document)
To query the results of your data pipeline, call DataClient.TabularDataByMQL
.
Configure the DataSource
argument with the following fields:
Type
of datapb.TabularDataSourceType_TABULAR_DATA_SOURCE_TYPE_PIPELINE_SINK
PipelineId
of your pipeline’s ID
client, err := utils.NewViamClient(context.Background(), utils.WithAPIKey("<api-key>", "<api-key-id>"))
if err != nil {
panic(err)
}
defer client.Close()
dataClient := client.DataClient()
query := [][]byte{
bson.Marshal(bson.M{}),
}
resp, err := dataClient.TabularDataByMQL(context.Background(), &datapb.TabularDataByMQLRequest{
OrganizationId: "<org-id>",
MqlBinary: query,
DataSource: &datapb.TabularDataSource{
Type: datapb.TabularDataSourceType_TABULAR_DATA_SOURCE_TYPE_PIPELINE_SINK,
PipelineId: proto.String("<pipeline-id>"),
},
})
for _, doc := range resp.Data {
fmt.Println(doc)
}
To query the results of your data pipeline, call dataClient.TabularDataByMQL
.
Configure the data_source
argument with the following fields:
type
of TabularDataSourceType.TABULAR_DATA_SOURCE_TYPE_PIPELINE_SINK
pipelineId
of your pipeline’s ID
const apiKey = "<api-key>";
const apiKeyID = "<api-key-id>";
const client = await createViamClient({
credential: {
type: "api-key",
payload: { key: apiKey, keyId: apiKeyID },
},
});
const dataClient = client.dataClient;
const query = [BSON.serialize({})];
const response = await dataClient.tabularDataByMQL({
organizationId: "<org-id>",
mqlBinary: query,
dataSource: {
type: TabularDataSourceType.TABULAR_DATA_SOURCE_TYPE_PIPELINE_SINK,
pipelineId: "<pipeline-id>",
},
});
response.data.forEach((doc) => {
console.log(BSON.deserialize(doc));
});
Use datapipelines list
to fetch a list of pipeline configurations in an organization:
viam datapipelines list --org-id=<org-id>
Use DataClient.ListDataPipelines
to fetch a list of pipeline configurations in an organization:
data_client = DataClient.from_api_key(
api_key="<api-key>",
api_key_id="<api-key-id>"
)
pipelines = data_client.list_data_pipelines(organization_id="<org-id>")
for pipeline in pipelines:
print(f"{pipeline.name} (id: {pipeline.id})")
print(f"Schedule: {pipeline.schedule}")
print(f"Enabled: {pipeline.enabled}")
Use DataClient.ListDataPipelines
to fetch a list of pipeline configurations in an organization:
client, err := utils.NewViamClient(context.Background(), utils.WithAPIKey("<api-key>", "<api-key-id>"))
if err != nil {
panic(err)
}
defer client.Close()
dataClient := client.DataClient()
resp, err := dataClient.ListDataPipelines(context.Background(), &datapb.ListDataPipelinesRequest{
OrganizationId: "<org-id>",
})
for _, pipeline := range resp.DataPipelines {
fmt.Printf("%s (id: %s)\n", pipeline.Name, pipeline.Id)
fmt.Printf("Schedule: %s\n", pipeline.Schedule)
fmt.Printf("Enabled: %v\n", pipeline.Enabled)
}
Use dataClient.ListDataPipelines
to fetch a list of pipeline configurations in an organization:
const apiKey = "<api-key>";
const apiKeyID = "<api-key-id>";
const client = await createViamClient({
credential: {
type: "api-key",
payload: { key: apiKey, keyId: apiKeyID },
},
});
const dataClient = client.dataClient;
const response = await dataClient.listDataPipelines({
organizationId: "<org-id>",
});
response.dataPipelines.forEach((pipeline) => {
console.log(`${pipeline.name} (id: ${pipeline.id})`);
console.log(`Schedule: ${pipeline.schedule}`);
console.log(`Enabled: ${pipeline.enabled}`);
});
Use caution when updating the query or schedule of a data pipeline. Changing either value can lead to inconsistent pipeline output history.
Use datapipelines update
to alter an existing data pipeline:
viam datapipelines update \
--org-id=<org-id> \
--id=<pipeline-id> \
--schedule="0 * * * *" \
--name="updated-name"
--mql='[{"$match": {"component_name": "sensor"}}, {"$group": {"_id": "$location_id", "avg": {"$avg": "$data.readings.value"}}}]'
To pass your query from a file instead of from inline MQL, pass the --mql-path
flag instead of --mql
.
Use DataClient.UpdateDataPipeline
to alter an existing data pipeline:
client, err := utils.NewViamClient(context.Background(), utils.WithAPIKey("<api-key>", "<api-key-id>"))
if err != nil {
panic(err)
}
defer client.Close()
dataClient := client.DataClient()
pipeline := [][]byte{
bson.Marshal(bson.M{"$match": bson.M{"component_name": "sensor"}}),
bson.Marshal(bson.M{
"$group": bson.M{
"_id": "$part_id",
"total": bson.M{"$sum": 1},
},
}),
}
_, err := dataClient.UpdateDataPipeline(context.Background(), &datapb.UpdateDataPipelineRequest{
Id: "<pipeline-id>",
Name: "updated-name",
MqlBinary: pipeline,
Schedule: "0 * * * *",
})
Use datapipelines enable
to enable a disabled data pipeline:
viam datapipelines enable --id=<pipeline-id>
Use DataClient.EnableDataPipeline
to enable a disabled data pipeline:
client, err := utils.NewViamClient(context.Background(), utils.WithAPIKey("<api-key>", "<api-key-id>"))
if err != nil {
panic(err)
}
defer client.Close()
dataClient := client.DataClient()
_, err := dataClient.EnableDataPipeline(context.Background(), &datapb.EnableDataPipelineRequest{
Id: "<pipeline-id>",
})
Disabling a data pipeline lets you pause data pipeline execution without fully deleting the pipeline configuration from your organization. The pipeline immediately stops aggregating data. You can re-enable the pipeline at any time to resume execution. Viam won’t backfill missed time windows from the period of time when a pipeline was disabled.
Use datapipelines disable
to disable a data pipeline:
viam datapipelines disable --id=<pipeline-id>
Use DataClient.DisableDataPipeline
to disable a data pipeline:
client, err := utils.NewViamClient(context.Background(), utils.WithAPIKey("<api-key>", "<api-key-id>"))
if err != nil {
panic(err)
}
defer client.Close()
dataClient := client.DataClient()
_, err := dataClient.DisableDataPipeline(context.Background(), &datapb.DisableDataPipelineRequest{
Id: "<pipeline-id>",
})
Use datapipelines delete
to delete a data pipeline, its execution history, and all output generated by that pipeline:
viam datapipelines delete --id=<pipeline-id>
Use DataClient.DeleteDataPipeline
to delete a data pipeline:
data_client = DataClient.from_api_key(
api_key="<api-key>",
api_key_id="<api-key-id>"
)
data_client.delete_data_pipeline(id="<pipeline-id>")
Use DataClient.DeleteDataPipeline
to delete a data pipeline:
client, err := utils.NewViamClient(context.Background(), utils.WithAPIKey("<api-key>", "<api-key-id>"))
if err != nil {
panic(err)
}
defer client.Close()
dataClient := client.DataClient()
_, err := dataClient.DeleteDataPipeline(context.Background(), &datapb.DeleteDataPipelineRequest{
Id: "<pipeline-id>",
})
Use dataClient.DeleteDataPipeline
to delete a data pipeline:
const apiKey = "<api-key>";
const apiKeyID = "<api-key-id>";
const client = await createViamClient({
credential: {
type: "api-key",
payload: { key: apiKey, keyId: apiKeyID },
},
});
const dataClient = client.dataClient;
await dataClient.deleteDataPipeline({ id: "<pipeline-id>" });
Data pipeline executions may have any of the following statuses:
SCHEDULED
: pending executionSTARTED
: currently processingCOMPLETED
: successfully finishedFAILED
: execution errorUse DataClient.ListDataPipelineRuns
to view information about past executions of a pipeline:
data_client = DataClient.from_api_key(
api_key="<api-key>",
api_key_id="<api-key-id>"
)
runs = data_client.list_data_pipeline_runs(
id="<pipeline-id>",
page_size=10
)
for run in runs:
print(f"Run {run.id}: {run.status}")
print(f"Data window: {run.data_start_time} to {run.data_end_time}")
print(f"Started: {run.started}, Ended: {run.ended}")
Use DataClient.ListDataPipelineRuns
to view information about past executions of a pipeline:
client, err := utils.NewViamClient(context.Background(), utils.WithAPIKey("<api-key>", "<api-key-id>"))
if err != nil {
panic(err)
}
defer client.Close()
dataClient := client.DataClient()
resp, err := dataClient.ListDataPipelineRuns(context.Background(), &datapb.ListDataPipelineRunsRequest{
Id: "<pipeline-id>",
PageSize: 10,
})
for _, run := range resp.Executions {
fmt.Printf("Run %s: %s\n", run.Id, run.Status)
fmt.Printf("Data window: %s to %s\n", run.DataStartTime, run.DataEndTime)
fmt.Printf("Started: %s, Ended: %s\n", run.Started, run.Ended)
}
Use dataClient.ListDataPipelineRuns
to view information about past executions of a pipeline:
const apiKey = "<api-key>";
const apiKeyID = "<api-key-id>";
const client = await createViamClient({
credential: {
type: "api-key",
payload: { key: apiKey, keyId: apiKeyID },
},
});
const dataClient = client.dataClient;
const response = await dataClient.listDataPipelineRuns({
id: "<pipeline-id>",
pageSize: 10,
});
response.executions.forEach((run) => {
console.log(`Run ${run.id}: ${run.status}`);
console.log(`Data window: ${run.dataStartTime} to ${run.dataEndTime}`);
console.log(`Started: ${run.started}, Ended: ${run.ended}`);
});
Was this page helpful?
Glad to hear it! If you have any other feedback please let us know:
We're sorry about that. To help us improve, please tell us what we can do better:
Thank you!