NetoAPI Python class to create traffic sources in Fusion

1

Decode & Validate NETOSECRET

Base64-decode the input string, parse JSON, and verify all required fields (appname, appkey, sharedsecret, shortname, url). Raises ValueError on failure.

import base64
import json
import logging
import random
import time
from functools import wraps
import os
import json

import jwt
import requests
from requests.exceptions import HTTPError, RequestException

class NetoAPIError(Exception):
    """Exception for NetoAPI operations."""
    pass

class NetoSecret:
    """Decode and validate a Base64-encoded NETOSECRET."""
    def __init__(self, netosecret: str):
        try:
            app_data = self.decode(netosecret)
        except Exception as e:
            raise ValueError("Invalid NETOSECRET") from e
        self.app_name = app_data.get("appname")
        self.app_key = app_data.get("appkey")
        self.shared_secret = app_data.get("sharedsecret")
        self.short_name = app_data.get("shortname")
        self.url = app_data.get("url")
        if any(v is None for v in (self.app_name, self.app_key, self.shared_secret, self.short_name, self.url)):
            raise ValueError("NETOSECRET missing required fields")

    def decode(self, api_key_base64: str) -> dict:
        """Base64-decode and parse JSON payload."""
        decoded = base64.b64decode(api_key_base64).decode()
        return json.loads(decoded)
2

Initialize NetoAPI Client

Instantiate NetoAPI with optional pre-set token. If auth_on_init, force an immediate authentication, raising NetoAPIError if it fails.

class NetoAPI:
    """Client for Netography Fusion traffic-source CRUD + auth."""
    def __init__(
        self,
        netosecret_string: str,
        logger: logging.Logger = None,
        auth_on_init: bool = True,
        token: str = None,
    ):
        self.logger = logger or logging.getLogger(__name__)
        self.netosecret = NetoSecret(netosecret_string)
        self.access_token = token
        self.auth_attempt = bool(token)
        if auth_on_init and not self.auth_attempt:
            if not self.auth(force=True):
                raise NetoAPIError("Initial authentication failed")
3

Authentication & Token Management

Use get_netography_access_token(force) to obtain or refresh a bearer token. Implements one-time caching, retry on network errors, logs success/failure.

    def authenticated_api(func):
        """Decorator: ensure valid token, retry once on 401."""
        @wraps(func)
        def wrapper(self, *args, **kwargs):
            if not self.access_token:
                if not self.auth():
                    raise NetoAPIError("Authentication failed")
            try:
                return func(self, *args, **kwargs)
            except HTTPError as e:
                if e.response.status_code == 401:
                    self.logger.info("Token expired, retrying authentication")
                    if not self.auth(force=True):
                        raise NetoAPIError("Re-authentication failed") from e
                    return func(self, *args, **kwargs)
                raise
        return wrapper

    def auth(self, force: bool = False) -> str:
        """Get or refresh bearer token."""
        return self.get_netography_access_token(force=force)

    def headers(self, subaccount: str = None, include_content: bool = True) -> dict:
        hdr = {
            "Accept": "application/json",
            "Authorization": f"Bearer {self.access_token}",
        }
        if include_content:
            hdr["Content-Type"] = "application/json"
        if subaccount:
            hdr["Neto-Shortname"] = subaccount
        return hdr

    def get_netography_access_token(self, force: bool = False) -> str:
        """
        Request a new JWT-signed token or return cached.
        If `force`, ignore previous attempts.
        """
        if self.auth_attempt and not force:
            return self.access_token
        self.auth_attempt = True
        payload = {
            "iat": int(time.time()),
            "jti": random.randint(0, 10_000_000),
            "appname": self.netosecret.app_name,
            "appkey": self.netosecret.app_key,
            "shortname": self.netosecret.short_name,
        }
        jwt_token = jwt.encode(payload, self.netosecret.shared_secret, algorithm="HS256")
        self.logger.info(f"Authenticating to {self.netosecret.url}")
        try:
            resp = requests.post(
                f"{self.netosecret.url}/auth/token",
                json={"jwt": jwt_token},
                headers={
                    "Accept": "application/json",
                    "Content-Type": "application/json",
                    "Authorization": f"Bearer {jwt_token}",
                },
            )
            resp.raise_for_status()
        except RequestException as e:
            self.logger.error(f"Auth request failed: {e}")
            self.auth_attempt = False
            return None
        data = resp.json()
        token = data.get("access_token")
        if not token:
            self.logger.error("No access_token in response")
            self.auth_attempt = False
            return None
        self.access_token = token
        self.logger.info("Authentication successful")
        return token
4

Create Flow Source

POST /vpc with the provided payload; on success returns parsed JSON, on failure raises NetoAPIError.

    @authenticated_api
    def create_flow_source(self, payload: dict, subaccount: str = None) -> dict:
        """Create a new VPC flow source; returns response JSON."""
        try:
            resp = requests.post(
                f"{self.netosecret.url}/vpc", json=payload, headers=self.headers(subaccount)
            )
            resp.raise_for_status()
        except RequestException as e:
            msg = f"Failed to create flow source: {e}"
            self.logger.error(msg)
            raise NetoAPIError(msg) from e
        return resp.json()
5

Update Flow Source

PUT /vpc/{id} to modify an existing source; returns parsed JSON or raises NetoAPIError.

    @authenticated_api
    def update_flow_source(self, flow_id: str, payload: dict, subaccount: str = None) -> dict:
        """Update an existing flow source by ID; returns response JSON."""
        try:
            resp = requests.put(
                f"{self.netosecret.url}/vpc/{flow_id}", json=payload, headers=self.headers(subaccount)
            )
            resp.raise_for_status()
        except RequestException as e:
            msg = f"Failed to update flow source {flow_id}: {e}"
            self.logger.error(msg)
            raise NetoAPIError(msg) from e
        return resp.json()
6

Delete Flow Source

DELETE /vpc/{id} to remove a source; returns True on HTTP 200/204, else raises NetoAPIError.

    @authenticated_api
    def delete_flow_source(self, flow_id: str, subaccount: str = None) -> bool:
        """Delete flow source by ID; returns True on success."""
        try:
            resp = requests.delete(
                f"{self.netosecret.url}/vpc/{flow_id}", headers=self.headers(subaccount)
            )
            resp.raise_for_status()
        except RequestException as e:
            msg = f"Failed to delete flow source {flow_id}: {e}"
            self.logger.error(msg)
            raise NetoAPIError(msg) from e
        return resp.status_code in (200, 204)
7

Calling the NetoAPI Class

Read NETOSECRET from environment, instantiate client, authenticate, define payload, and call API

# ----------------------------------------------------------------
# Example usage: authenticate and create a new flow source
# ----------------------------------------------------------------

if __name__ == "__main__":
    logger = logging.getLogger("NetoAPIExample")

    # Load your NETOSECRET (base64‐encoded JSON) from env var or other secure store
    netosecret_b64 = os.getenv("NETOSECRET")
    if not netosecret_b64:
        logger.error("Please set the NETOSECRET environment variable")
        exit(1)

    # Instantiate client (will authenticate on init)
    try:
        api = NetoAPI(netosecret_b64, logger=logger)
    except Exception as e:
        logger.error(f"Failed to initialize NetoAPI client: {e}")
        exit(1)

    # Define the payload (equivalent to your curl --data JSON)
    flow_source_payload = {
        "flowtype": "aws",
        "flowresource": "s3",
        "enabled": True,
        "awsauthtype": "RoleARN",
        "role": {
            "arn": "arn:aws:iam::1234567890123:role/NetoFlowLogReader"
        },
        "name": "vpc-01",
        "samplerate": 1,
        "bucket": "neto-myflowlogs-bucket-example",
        "bucketregion": "us-east-1",
        "prefix": "myflowlogs-for-vpc-01",
        "region": "us-east-1",
        "accountid": "1234567890123"
    }

    # Attempt to create the flow source
    try:
        result = api.create_flow_source(flow_source_payload)
        logger.info("Flow source created successfully:")
        print(json.dumps(result, indent=2))
    except NetoAPIError as err:
        logger.error(f"Error creating flow source: {err}")
        exit(1)

Last updated