Skip to content

gigapi/gigapi-scripts-poc

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

4 Commits
 
 
 
 
 
 

Repository files navigation

GigAPI Plugin POC

InfluxDB Python Pipeline Proof of Concept

This proof of concept demonstrates embedding Python in a Go application to process InfluxDB line protocol data. It creates a data pipeline that allows Go to generate data and Python to process it, providing a flexible way to extend InfluxDB capabilities with Python-based logic.

Overview

The application works as follows:

  1. Go generates InfluxDB line protocol data at regular intervals
  2. The data is stored in a configurable buffer
  3. When processing occurs, Go sends the data to an embedded Python interpreter
  4. Python processes the data and returns derived metrics
  5. Go receives the results and clears the buffer for the next cycle

This pattern allows you to leverage both Go's performance for data generation and handling, and Python's rich ecosystem for data processing and analysis.

Requirements

  • Go 1.18 or later
  • Python 3.6 or later
  • The github.com/kluctl/go-embed-python library

Running the Application

# Basic usage
go run main.go

# With custom settings
go run main.go -interval 2s -buffer-size 500 -debug

# Run with all options
go run main.go -interval 1s -buffer-size 2000 -script-dir ./python_scripts -debug

Command-line Flags

  • -interval: How often to generate new data (default: 5s)
  • -buffer-size: Maximum buffer size for storing data (default: 1000)
  • -script-dir: Directory for Python scripts (default: "scripts")
  • -debug: Enable debug output (default: false)

Python Processing Script Format

The Python processing is divided into two main components:

1. The Data Processor Module (data_processor.py)

This module defines classes for working with line protocol data and the processing function:

import sys

class LineBuilder:
    def __init__(self, measurement):
        self.measurement = measurement
        self.tags = {}
        self.fields = {}
        
    def tag(self, key, value):
        self.tags[key] = value
        return self
        
    def int64_field(self, key, value):
        self.fields[key] = f"{value}i"
        return self
        
    def to_line(self):
        tag_str = ",".join([f"{k}={v}" for k, v in self.tags.items()])
        field_str = ",".join([f"{k}={v}" for k, v in self.fields.items()])
        
        if tag_str:
            return f"{self.measurement},{tag_str} {field_str}"
        else:
            return f"{self.measurement} {field_str}"

class InfluxDB3Local:
    def __init__(self):
        self.lines = []
    
    def info(self, message):
        print(f"[INFO] {message}", file=sys.stderr)
    
    def write(self, line):
        if isinstance(line, LineBuilder):
            line_str = line.to_line()
        else:
            line_str = str(line)
        self.lines.append(line_str)
        print(f"[WRITE] {line_str}", file=sys.stderr)
        return True

def process_writes(influxdb3_local, table_batches, args=None):
    # Create InfluxDB mock if not provided
    if not isinstance(influxdb3_local, InfluxDB3Local):
        influxdb3_local = InfluxDB3Local()
    
    # Process data as it's written to the database
    for table_batch in table_batches:
        table_name = table_batch["table_name"]
        rows = table_batch["rows"]
        
        # Log information about the write
        influxdb3_local.info(f"Processing {len(rows)} rows from {table_name}")
        
        # Write derived data back to the database
        line = LineBuilder("processed_data")
        line.tag("source_table", table_name)
        line.int64_field("row_count", len(rows))
        influxdb3_local.write(line)
    
    return {
        "processed": len(table_batches),
        "written_lines": influxdb3_local.lines
    }

2. The Server Script (processor_server.py)

This script handles the I/O and calls the processing function:

import json
import sys
import traceback
from data_processor import process_writes, InfluxDB3Local

# Create a persistent InfluxDB local instance
influxdb3_local = InfluxDB3Local()

try:
    # Read input data from stdin
    data = json.load(sys.stdin)
    table_batches = data.get("table_batches", [])
    
    # Process the data
    result = process_writes(influxdb3_local, table_batches)
    
    # Output the result as JSON
    print(json.dumps(result))
    
except Exception as e:
    # Print traceback to stderr for debugging
    traceback.print_exc(file=sys.stderr)
    
    # Return error information to Go
    error_result = {
        "error": str(e),
        "processed": 0,
        "written_lines": []
    }
    print(json.dumps(error_result))

Data Format

Input Format (Go to Python)

The data sent from Go to Python is a JSON object with the following structure:

{
  "table_batches": [
    {
      "table_name": "cpu",
      "rows": [
        {
          "line": "cpu,host=server01,region=us-west usage_user=0.64,usage_system=0.21 1683721234000000000",
          "timestamp": "1683721234000000000"
        }
      ]
    },
    {
      "table_name": "memory",
      "rows": [
        {
          "line": "memory,host=server01,region=us-west used_percent=72.45,available=4321000000i 1683721234000000000",
          "timestamp": "1683721234000000000"
        }
      ]
    }
  ]
}

Output Format (Python to Go)

The data returned from Python to Go is a JSON object with the following structure:

{
  "processed": 3,
  "written_lines": [
    "processed_data,source_table=cpu row_count=1i",
    "processed_data,source_table=memory row_count=1i",
    "processed_data,source_table=disk row_count=1i"
  ]
}

Extending the Proof of Concept

To extend this proof of concept for your own use case:

  1. Modify the generateLineProtocolData() function to generate your actual data
  2. Update the formatDataForPython() function to format your data as needed
  3. Customize the Python process_writes() function to implement your processing logic
  4. Update the data structures to match your specific requirements

Architecture Considerations

  • Data Volume: For high-volume data, you may need to optimize the buffer management and batch processing
  • Processing Complexity: For complex processing, consider keeping the Python interpreter running instead of creating a new one for each batch
  • Error Handling: Enhance error handling and recovery mechanisms for production use
  • Security: Be mindful of security implications when passing data between Go and Python

License

This proof of concept is provided as-is under the MIT License.

About

POC of GO/Python script execution

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages