This proof of concept demonstrates embedding Python in a Go application to process InfluxDB line protocol data. It creates a data pipeline that allows Go to generate data and Python to process it, providing a flexible way to extend InfluxDB capabilities with Python-based logic.
The application works as follows:
- Go generates InfluxDB line protocol data at regular intervals
- The data is stored in a configurable buffer
- When processing occurs, Go sends the data to an embedded Python interpreter
- Python processes the data and returns derived metrics
- Go receives the results and clears the buffer for the next cycle
This pattern allows you to leverage both Go's performance for data generation and handling, and Python's rich ecosystem for data processing and analysis.
- Go 1.18 or later
- Python 3.6 or later
- The
github.com/kluctl/go-embed-pythonlibrary
# Basic usage
go run main.go
# With custom settings
go run main.go -interval 2s -buffer-size 500 -debug
# Run with all options
go run main.go -interval 1s -buffer-size 2000 -script-dir ./python_scripts -debug-interval: How often to generate new data (default: 5s)-buffer-size: Maximum buffer size for storing data (default: 1000)-script-dir: Directory for Python scripts (default: "scripts")-debug: Enable debug output (default: false)
The Python processing is divided into two main components:
This module defines classes for working with line protocol data and the processing function:
import sys
class LineBuilder:
def __init__(self, measurement):
self.measurement = measurement
self.tags = {}
self.fields = {}
def tag(self, key, value):
self.tags[key] = value
return self
def int64_field(self, key, value):
self.fields[key] = f"{value}i"
return self
def to_line(self):
tag_str = ",".join([f"{k}={v}" for k, v in self.tags.items()])
field_str = ",".join([f"{k}={v}" for k, v in self.fields.items()])
if tag_str:
return f"{self.measurement},{tag_str} {field_str}"
else:
return f"{self.measurement} {field_str}"
class InfluxDB3Local:
def __init__(self):
self.lines = []
def info(self, message):
print(f"[INFO] {message}", file=sys.stderr)
def write(self, line):
if isinstance(line, LineBuilder):
line_str = line.to_line()
else:
line_str = str(line)
self.lines.append(line_str)
print(f"[WRITE] {line_str}", file=sys.stderr)
return True
def process_writes(influxdb3_local, table_batches, args=None):
# Create InfluxDB mock if not provided
if not isinstance(influxdb3_local, InfluxDB3Local):
influxdb3_local = InfluxDB3Local()
# Process data as it's written to the database
for table_batch in table_batches:
table_name = table_batch["table_name"]
rows = table_batch["rows"]
# Log information about the write
influxdb3_local.info(f"Processing {len(rows)} rows from {table_name}")
# Write derived data back to the database
line = LineBuilder("processed_data")
line.tag("source_table", table_name)
line.int64_field("row_count", len(rows))
influxdb3_local.write(line)
return {
"processed": len(table_batches),
"written_lines": influxdb3_local.lines
}This script handles the I/O and calls the processing function:
import json
import sys
import traceback
from data_processor import process_writes, InfluxDB3Local
# Create a persistent InfluxDB local instance
influxdb3_local = InfluxDB3Local()
try:
# Read input data from stdin
data = json.load(sys.stdin)
table_batches = data.get("table_batches", [])
# Process the data
result = process_writes(influxdb3_local, table_batches)
# Output the result as JSON
print(json.dumps(result))
except Exception as e:
# Print traceback to stderr for debugging
traceback.print_exc(file=sys.stderr)
# Return error information to Go
error_result = {
"error": str(e),
"processed": 0,
"written_lines": []
}
print(json.dumps(error_result))The data sent from Go to Python is a JSON object with the following structure:
{
"table_batches": [
{
"table_name": "cpu",
"rows": [
{
"line": "cpu,host=server01,region=us-west usage_user=0.64,usage_system=0.21 1683721234000000000",
"timestamp": "1683721234000000000"
}
]
},
{
"table_name": "memory",
"rows": [
{
"line": "memory,host=server01,region=us-west used_percent=72.45,available=4321000000i 1683721234000000000",
"timestamp": "1683721234000000000"
}
]
}
]
}The data returned from Python to Go is a JSON object with the following structure:
{
"processed": 3,
"written_lines": [
"processed_data,source_table=cpu row_count=1i",
"processed_data,source_table=memory row_count=1i",
"processed_data,source_table=disk row_count=1i"
]
}To extend this proof of concept for your own use case:
- Modify the
generateLineProtocolData()function to generate your actual data - Update the
formatDataForPython()function to format your data as needed - Customize the Python
process_writes()function to implement your processing logic - Update the data structures to match your specific requirements
- Data Volume: For high-volume data, you may need to optimize the buffer management and batch processing
- Processing Complexity: For complex processing, consider keeping the Python interpreter running instead of creating a new one for each batch
- Error Handling: Enhance error handling and recovery mechanisms for production use
- Security: Be mindful of security implications when passing data between Go and Python
This proof of concept is provided as-is under the MIT License.