Create a Traffic Source in Python

1

Decode NETOSECRET

Decodes base64 encoded NETOSECRET to its JSON fields

class NetoSecret:
    def __init__(self, api_key_base64):
        self.decode_netosecret(api_key_base64)

    def decode_netosecret(self, api_key_base64):
        decoded_str = base64.b64decode(api_key_base64).decode()
        netosecret_json = json.loads(decoded_str)
        self.app_name = netosecret_json.get("appname")
        self.app_key = netosecret_json.get("appkey")
        self.shared_secret = netosecret_json.get("sharedsecret")
        self.short_name = netosecret_json.get("shortname")
        self.url = netosecret_json.get("url")
2

Authenticate to Fusion API

Construct JWT request token and authenticate

def get_netography_access_token(netosecret: Optional[NetoSecret] = None) -> Optional[str]:
    """
    Asynchronously retrieve and store the Netography Fusion access token.
    Returns None if authentication fails.
    """
    payload = {
        "iat": int(time.time()),
        "jti": random.randint(0, 10000000),
        "appname": netosecret.app_name,
        "appkey": netosecret.app_key,
        "shortname": netosecret.short_name,
    }
    jwt_token = jwt.encode(payload, netosecret.shared_secret, algorithm="HS256")

    headers = {
        "accept": "application/json",
        "content-type": "application/json",
        "authorization": f"Bearer {jwt_token}",
    }

    logging.info(
        f"Authenticating to Netography Fusion API for {netosecret.short_name} at {netosecret.url}"
    )

    url = f"{netosecret.url}/auth/token"
    session.post(url, json={"jwt": jwt_token}, headers=headers)
    response = session.post(url, json={"jwt": jwt_token}, headers=headers)
    if response.status_code != 200:
        error_text = response.text()
        logger.error(
            f"Failed to authenticate with Netography Fusion API: "
            f"{response.status_code}, {error_text}"
        )
        return None

    response_data = response.json()
    return response_data["access_token"]
3

Create traffic source

Create a traffic source

def create_flow_source(
        netosecret: NetoSecret,
        role_arn: str,
        log_bucket: str,
        vpc_region: str,
        log_bucket_region: str,
        log_bucket_prefix: str,
        vpc_id: str,
        account_id: str,
    ):
    logging.info(
        f"Creating flow source for {vpc_id} in account {account_id} in region {log_bucket_region}"
    )
    token = get_netography_access_token(netosecret)
    create_payload = {
        "awsauthtype": "RoleARN",
        "role": {"arn": role_arn},
        "bucket": log_bucket,
        "bucketregion": log_bucket_region,
        "prefix": log_bucket_prefix,
        "region": vpc_region,
        "sqsurl": "",
        "name": f"{account_id}-{vpc_id}",
        "enabled": True,
        "hiveprefix": False,
        "flowtype": "aws",
        "flowresource": "s3",
        "accountid": account_id,
        "traffictype": "flow",
        "deleteobjects": False,
        "samplerate": 100,
        "tags": [f"{vpc_region}", f"{vpc_id}"],
    }
    headers["authorization"] = f"Bearer {token}"
    requests.post(
        f"{netosecret.url}/vpc",
        json=create_payload,
        headers=headers,
    )


def get_flow_sources(token):
	url = f"{self.netosecret.url}/vpc"
	headers = {"Authorization": f"Bearer {token}"}
	response = requests.get(url, headers=headers)
	return response.json().get("data", [])

def delete_flow_source(fs_id, token):
	url = f"{self.netosecret.url}/vpc/{fs_id}"
	headers = {"Authorization": f"Bearer {token}"}
	response = requests.delete(url, headers=headers)

Last updated