Retrieve a list of source IP addresses from the blocklist with the API

An example of how to authenticate and then use the API to retrieve values from the blocklist.

1

Enter the API key values

Replace the CHANGEME values with the appropriate API key fields.

#
# Copyright (c) 2022 Netography, Inc. All rights reserved.
#
# A python script to fetch a list of configured IP Labels, using API Keys
#
# Requries pyjwt and requests:
# $ python3 -m pip install pyjwt requests
#
# Usage:
# $ python3 api-key-search-blocks.py
#

import jwt
import requests
import random
import time
import json
import os
from http.client import responses

#~~ BEGIN Configuration ~~~

# API Key Name
APPNAME = 'CHANGEME'
# API Key Secret
APPKEY = 'CHANGEME'
# Shortname
SHORTNAME = 'CHANGEME'
# API Shared Secret
SHARED_SECRET = 'CHANGEME'

API_BASE_URL = 'https://api.netography.com/api/v1'

# Construct the API Post Request
API_URL = API_BASE_URL + '/search/block'
API_POST_BODY = {
  "start": -3600000,
  "end": 0,
  "search": "active == true",
}

# path/filename to cache the JWT auth token 
CACHE_FILE = '~/.neto.token'

# ~~ END Configuration
2

Check for a cached JWT bearer token

Check to see if a cached JWT bearer token is already stored in a file, and use it if so.]

# Cache Bearer token and re-use if not expired
jwt_valid = False
access_token = None
if os.path.exists(CACHE_FILE):
  with open(CACHE_FILE) as f:
    cache_data = json.load(f)
  access_token = cache_data['access_token']
  expires_in = cache_data['expires_in']
  token_date = os.path.getmtime(CACHE_FILE)
  expire_timestamp = token_date + expires_in - 60 # account for some clock skew
  # print('JWT expires at ' + time.strftime("%Y-%m-%d %H:%M:%S+00:00 (UTC)", time.gmtime(expire_timestamp)) + "\n")

  if (time.time() <= expire_timestamp):
    jwt_valid = True

if not jwt_valid:
3

Encode a JWT request token

Create a JWT request token as a string to send to API to authenticate. The jwt parameter for the API call is set to this value.

  # Generate the JWT request token
  payload = {
    'iat': int(time.time()),
    'jti': random.randint(0,10000000),
    'appname': APPNAME,
    'appkey': APPKEY,
    'shortname': SHORTNAME
  }

  token = jwt.encode(payload, SHARED_SECRET, algorithm="HS256")

  # Create the HTTP POST request with a JSON payload containing the JWT request token
  body = {
    'jwt': token
  }
4

Authenticate to API

Sends a POST to /auth/token with the JWT request token. If successfully authenticates, the JWT bearer token is provided in the access_token field in the response.

  try:
      resp = requests.post(API_BASE_URL + '/auth/token', json=body)
      data = resp.json()
      # print(data)
      if 'access_token' not in data:
        if 'message' in data:
          print(f"{str(resp.status_code)} {data['message']}.")
        else:
          print(f"{str(resp.status_code)} {responses[resp.status_code]}.  access_token not found in response")
        raise SystemExit(1)

      # print("bearer: %s" % (data['access_token']))
      access_token = data['access_token']
5

Cache the JWT bearer token to a file

Stores the JWT bearer token to a file cache so it can be reused between API calls that span multiple process sessions without re-authenticating each time.

      with open(CACHE_FILE, 'w') as f:
        json.dump(data, f)
  except Exception as e:
      # print(str(e))
      print(f"{str(resp.status_code)} {responses[resp.status_code]}.  Verify your configuration parameters")
      raise SystemExit(1)
6

Make the API call to the /search/block

Using the JWT bearer token to authenticate the call, make an API call to retrieve blocklist data and parse the output.

# Now have a valid Bearer Token, construct the API request
headers = {
  'Authorization': 'Bearer ' + access_token
}

try:
    resp = requests.post(API_URL, headers=headers, json=API_POST_BODY)
    data = resp.json()
    # print(data)

    if 'data' not in data:
      print("data key not found in response data")
      raise SystemExit(1)

    if not data['data']:
      print("The response had no results")
      raise SystemExit(1)

    for row in data['data']:
      print(row['srcip'])
except Exception as e:
    print('API Error: ' + str(e))
    raise SystemExit(1)

Last updated