Batch Ingest Data
Send multiple time-series data points to a bucket in a single request. This is more efficient than sending individual data points.
Endpoint
POST /api/data/batch
Authentication
| Key Type | Allowed |
|---|---|
Admin (dakkio_a_) | ✅ Yes |
Write (dakkio_w_) | ✅ Yes |
Read (dakkio_r_) | ❌ No |
Recommended: Use a Write key (dakkio_w_) for data ingestion.
When to use batch ingestion
- Sending data from multiple sensors at once
- Uploading historical data
- Buffering readings and sending periodically
- Any scenario with more than 2-3 data points
Request
Headers
| Header | Type | Required | Description |
|---|---|---|---|
X-API-Key | string | ✅ Yes | Your Write or Admin API key |
Content-Type | string | ✅ Yes | Must be application/json |
Body Parameters
| Parameter | Type | Required | Description |
|---|---|---|---|
bucketId | string | ✅ Yes | The bucket ID (24-character hex string) |
dataPoints | array | ✅ Yes | Array of data point objects (max 1000) |
Data Point Object
Each object in the dataPoints array:
| Parameter | Type | Required | Description |
|---|---|---|---|
dataSourceId | string | ✅ Yes | The data source ID |
timestamp | string | ❌ No | ISO 8601 timestamp (default: current time) |
values | object | ✅ Yes | Key-value pairs of measurements |
metadata | object | ❌ No | Additional context |
Example Request
curl -X POST https://api.dakkio.io/api/data/batch \
-H "X-API-Key: dakkio_w_abc123def456..." \
-H "Content-Type: application/json" \
-d '{
"bucketId": "507f1f77bcf86cd799439011",
"dataPoints": [
{
"dataSourceId": "507f1f77bcf86cd799439012",
"timestamp": "2024-01-15T10:00:00Z",
"values": { "temperature": 22.0, "humidity": 68 },
"metadata": { "deviceId": "ESP32-001" }
},
{
"dataSourceId": "507f1f77bcf86cd799439012",
"timestamp": "2024-01-15T10:05:00Z",
"values": { "temperature": 22.2, "humidity": 67 },
"metadata": { "deviceId": "ESP32-001" }
},
{
"dataSourceId": "507f1f77bcf86cd799439012",
"timestamp": "2024-01-15T10:10:00Z",
"values": { "temperature": 22.5, "humidity": 65 },
"metadata": { "deviceId": "ESP32-001" }
}
]
}'
Response
Success Response (201 Created)
{
"message": "Batch data ingested successfully",
"insertedCount": 3
}
Partial Success Response (207 Multi-Status)
When some data points succeed and others fail:
{
"message": "Batch partially ingested",
"insertedCount": 2,
"failedCount": 1,
"errors": [
{
"index": 1,
"error": "Invalid dataSourceId"
}
]
}
Error Responses
400 Bad Request - Validation Error
{
"error": "Validation Error",
"message": "Invalid request parameters",
"details": [
{
"path": ["body", "dataPoints"],
"message": "Array must contain at least 1 element(s)"
}
]
}
400 Bad Request - Too Many Data Points
{
"error": "Validation Error",
"message": "Batch size exceeds maximum of 1000 data points"
}
401 Unauthorized
{
"error": "Unauthorized",
"message": "Invalid or missing API key"
}
403 Forbidden
{
"error": "Forbidden",
"message": "Read keys cannot ingest data. Use a Write or Admin key."
}
Code Examples
JavaScript/Node.js
const axios = require('axios');
async function batchIngest(dataPoints) {
const response = await axios.post(
'https://api.dakkio.io/api/data/batch',
{
bucketId: process.env.BUCKET_ID,
dataPoints: dataPoints
},
{
headers: {
'X-API-Key': process.env.DAKKIO_WRITE_KEY,
'Content-Type': 'application/json'
}
}
);
console.log(`Inserted ${response.data.insertedCount} data points`);
return response.data;
}
// Example: Buffer readings and send every minute
let buffer = [];
function addReading(values) {
buffer.push({
dataSourceId: process.env.DATA_SOURCE_ID,
timestamp: new Date().toISOString(),
values: values
});
// Flush when buffer reaches 100 items
if (buffer.length >= 100) {
batchIngest(buffer);
buffer = [];
}
}
Python
import requests
import os
from datetime import datetime
def batch_ingest(data_points):
response = requests.post(
'https://api.dakkio.io/api/data/batch',
headers={
'X-API-Key': os.environ['DAKKIO_WRITE_KEY'],
'Content-Type': 'application/json'
},
json={
'bucketId': os.environ['BUCKET_ID'],
'dataPoints': data_points
}
)
if response.status_code == 201:
result = response.json()
print(f"Inserted {result['insertedCount']} data points")
return result
else:
print('Error:', response.json())
return None
# Example: Send multiple readings
data_points = [
{
'dataSourceId': os.environ['DATA_SOURCE_ID'],
'timestamp': '2024-01-15T10:00:00Z',
'values': {'temperature': 22.0, 'humidity': 68}
},
{
'dataSourceId': os.environ['DATA_SOURCE_ID'],
'timestamp': '2024-01-15T10:05:00Z',
'values': {'temperature': 22.2, 'humidity': 67}
}
]
batch_ingest(data_points)
Go
package main
import (
"bytes"
"encoding/json"
"net/http"
"os"
)
type BatchRequest struct {
BucketId string `json:"bucketId"`
DataPoints []DataPoint `json:"dataPoints"`
}
type DataPoint struct {
DataSourceId string `json:"dataSourceId"`
Timestamp string `json:"timestamp"`
Values map[string]interface{} `json:"values"`
Metadata map[string]interface{} `json:"metadata,omitempty"`
}
func batchIngest(dataPoints []DataPoint) error {
request := BatchRequest{
BucketId: os.Getenv("BUCKET_ID"),
DataPoints: dataPoints,
}
jsonData, _ := json.Marshal(request)
req, _ := http.NewRequest("POST", "https://api.dakkio.io/api/data/batch", bytes.NewBuffer(jsonData))
req.Header.Set("X-API-Key", os.Getenv("DAKKIO_WRITE_KEY"))
req.Header.Set("Content-Type", "application/json")
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
return nil
}
Limits
| Limit | Value |
|---|---|
| Max data points per batch | 1000 |
| Max request size | 10 MB |
| Max timestamp age | 30 days in the past |
Best Practices
1. Optimal Batch Size
For best performance, send batches of 100-500 data points:
const BATCH_SIZE = 100;
async function sendInBatches(allDataPoints) {
for (let i = 0; i < allDataPoints.length; i += BATCH_SIZE) {
const batch = allDataPoints.slice(i, i + BATCH_SIZE);
await batchIngest(batch);
}
}
2. Buffer and Flush Strategy
For IoT devices, buffer readings locally and flush periodically:
class DataBuffer {
constructor(maxSize = 100, flushInterval = 60000) {
this.buffer = [];
this.maxSize = maxSize;
// Flush periodically
setInterval(() => this.flush(), flushInterval);
}
add(dataPoint) {
this.buffer.push(dataPoint);
if (this.buffer.length >= this.maxSize) {
this.flush();
}
}
async flush() {
if (this.buffer.length === 0) return;
const toSend = this.buffer;
this.buffer = [];
await batchIngest(toSend);
}
}
3. Handle Partial Failures
Check for partial success (207 status) and retry failed items:
const response = await batchIngest(dataPoints);
if (response.failedCount > 0) {
console.log(`${response.failedCount} items failed`);
// Retry failed items
const failedIndices = response.errors.map(e => e.index);
const failedItems = failedIndices.map(i => dataPoints[i]);
// ... retry logic
}
4. Ensure Chronological Order
For best query performance, send data points in chronological order when possible.
Related Endpoints
- Ingest Single Data Point - Send individual data points
- Query Data - Retrieve time-series data
- Create Data Source - Define data source schema