Skip to main content

Batch Ingest Data

Send multiple time-series data points to a bucket in a single request. This is more efficient than sending individual data points.

Endpoint

POST /api/data/batch

Authentication

Key TypeAllowed
Admin (dakkio_a_)✅ Yes
Write (dakkio_w_)✅ Yes
Read (dakkio_r_)❌ No

Recommended: Use a Write key (dakkio_w_) for data ingestion.

When to use batch ingestion
  • Sending data from multiple sensors at once
  • Uploading historical data
  • Buffering readings and sending periodically
  • Any scenario with more than 2-3 data points

Request

Headers

HeaderTypeRequiredDescription
X-API-Keystring✅ YesYour Write or Admin API key
Content-Typestring✅ YesMust be application/json

Body Parameters

ParameterTypeRequiredDescription
bucketIdstring✅ YesThe bucket ID (24-character hex string)
dataPointsarray✅ YesArray of data point objects (max 1000)

Data Point Object

Each object in the dataPoints array:

ParameterTypeRequiredDescription
dataSourceIdstring✅ YesThe data source ID
timestampstring❌ NoISO 8601 timestamp (default: current time)
valuesobject✅ YesKey-value pairs of measurements
metadataobject❌ NoAdditional context

Example Request

curl -X POST https://api.dakkio.io/api/data/batch \
-H "X-API-Key: dakkio_w_abc123def456..." \
-H "Content-Type: application/json" \
-d '{
"bucketId": "507f1f77bcf86cd799439011",
"dataPoints": [
{
"dataSourceId": "507f1f77bcf86cd799439012",
"timestamp": "2024-01-15T10:00:00Z",
"values": { "temperature": 22.0, "humidity": 68 },
"metadata": { "deviceId": "ESP32-001" }
},
{
"dataSourceId": "507f1f77bcf86cd799439012",
"timestamp": "2024-01-15T10:05:00Z",
"values": { "temperature": 22.2, "humidity": 67 },
"metadata": { "deviceId": "ESP32-001" }
},
{
"dataSourceId": "507f1f77bcf86cd799439012",
"timestamp": "2024-01-15T10:10:00Z",
"values": { "temperature": 22.5, "humidity": 65 },
"metadata": { "deviceId": "ESP32-001" }
}
]
}'

Response

Success Response (201 Created)

{
"message": "Batch data ingested successfully",
"insertedCount": 3
}

Partial Success Response (207 Multi-Status)

When some data points succeed and others fail:

{
"message": "Batch partially ingested",
"insertedCount": 2,
"failedCount": 1,
"errors": [
{
"index": 1,
"error": "Invalid dataSourceId"
}
]
}

Error Responses

400 Bad Request - Validation Error

{
"error": "Validation Error",
"message": "Invalid request parameters",
"details": [
{
"path": ["body", "dataPoints"],
"message": "Array must contain at least 1 element(s)"
}
]
}

400 Bad Request - Too Many Data Points

{
"error": "Validation Error",
"message": "Batch size exceeds maximum of 1000 data points"
}

401 Unauthorized

{
"error": "Unauthorized",
"message": "Invalid or missing API key"
}

403 Forbidden

{
"error": "Forbidden",
"message": "Read keys cannot ingest data. Use a Write or Admin key."
}

Code Examples

JavaScript/Node.js

const axios = require('axios');

async function batchIngest(dataPoints) {
const response = await axios.post(
'https://api.dakkio.io/api/data/batch',
{
bucketId: process.env.BUCKET_ID,
dataPoints: dataPoints
},
{
headers: {
'X-API-Key': process.env.DAKKIO_WRITE_KEY,
'Content-Type': 'application/json'
}
}
);

console.log(`Inserted ${response.data.insertedCount} data points`);
return response.data;
}

// Example: Buffer readings and send every minute
let buffer = [];

function addReading(values) {
buffer.push({
dataSourceId: process.env.DATA_SOURCE_ID,
timestamp: new Date().toISOString(),
values: values
});

// Flush when buffer reaches 100 items
if (buffer.length >= 100) {
batchIngest(buffer);
buffer = [];
}
}

Python

import requests
import os
from datetime import datetime

def batch_ingest(data_points):
response = requests.post(
'https://api.dakkio.io/api/data/batch',
headers={
'X-API-Key': os.environ['DAKKIO_WRITE_KEY'],
'Content-Type': 'application/json'
},
json={
'bucketId': os.environ['BUCKET_ID'],
'dataPoints': data_points
}
)

if response.status_code == 201:
result = response.json()
print(f"Inserted {result['insertedCount']} data points")
return result
else:
print('Error:', response.json())
return None

# Example: Send multiple readings
data_points = [
{
'dataSourceId': os.environ['DATA_SOURCE_ID'],
'timestamp': '2024-01-15T10:00:00Z',
'values': {'temperature': 22.0, 'humidity': 68}
},
{
'dataSourceId': os.environ['DATA_SOURCE_ID'],
'timestamp': '2024-01-15T10:05:00Z',
'values': {'temperature': 22.2, 'humidity': 67}
}
]

batch_ingest(data_points)

Go

package main

import (
"bytes"
"encoding/json"
"net/http"
"os"
)

type BatchRequest struct {
BucketId string `json:"bucketId"`
DataPoints []DataPoint `json:"dataPoints"`
}

type DataPoint struct {
DataSourceId string `json:"dataSourceId"`
Timestamp string `json:"timestamp"`
Values map[string]interface{} `json:"values"`
Metadata map[string]interface{} `json:"metadata,omitempty"`
}

func batchIngest(dataPoints []DataPoint) error {
request := BatchRequest{
BucketId: os.Getenv("BUCKET_ID"),
DataPoints: dataPoints,
}

jsonData, _ := json.Marshal(request)
req, _ := http.NewRequest("POST", "https://api.dakkio.io/api/data/batch", bytes.NewBuffer(jsonData))
req.Header.Set("X-API-Key", os.Getenv("DAKKIO_WRITE_KEY"))
req.Header.Set("Content-Type", "application/json")

client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()

return nil
}

Limits

LimitValue
Max data points per batch1000
Max request size10 MB
Max timestamp age30 days in the past

Best Practices

1. Optimal Batch Size

For best performance, send batches of 100-500 data points:

const BATCH_SIZE = 100;

async function sendInBatches(allDataPoints) {
for (let i = 0; i < allDataPoints.length; i += BATCH_SIZE) {
const batch = allDataPoints.slice(i, i + BATCH_SIZE);
await batchIngest(batch);
}
}

2. Buffer and Flush Strategy

For IoT devices, buffer readings locally and flush periodically:

class DataBuffer {
constructor(maxSize = 100, flushInterval = 60000) {
this.buffer = [];
this.maxSize = maxSize;

// Flush periodically
setInterval(() => this.flush(), flushInterval);
}

add(dataPoint) {
this.buffer.push(dataPoint);
if (this.buffer.length >= this.maxSize) {
this.flush();
}
}

async flush() {
if (this.buffer.length === 0) return;

const toSend = this.buffer;
this.buffer = [];

await batchIngest(toSend);
}
}

3. Handle Partial Failures

Check for partial success (207 status) and retry failed items:

const response = await batchIngest(dataPoints);

if (response.failedCount > 0) {
console.log(`${response.failedCount} items failed`);

// Retry failed items
const failedIndices = response.errors.map(e => e.index);
const failedItems = failedIndices.map(i => dataPoints[i]);
// ... retry logic
}

4. Ensure Chronological Order

For best query performance, send data points in chronological order when possible.