Skip to content

Commit

Permalink
Merge pull request #4 from emiliacebrat/master
Browse files Browse the repository at this point in the history
Update tia_example.py

Thanks @emiliacebrat !
  • Loading branch information
farrokhi authored Jan 31, 2024
2 parents e210e0a + 5122dee commit ddabd24
Show file tree
Hide file tree
Showing 4 changed files with 147 additions and 96 deletions.
98 changes: 66 additions & 32 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,16 @@ This repository contains sample code for threat intelligence providers who provi

threat-intel-api is an HTTP/Websocket service that allows threat intelligence (TI) providers to retrieve telemetry data generated from the malicious domain names they provide to Quad9 via their threat intelligence feeds.

Contact Quad9 at support@quad9.net if you are a threat intelligence provider and need a key.
Contact Quad9 at support@quad9.net if you are a threat intelligence provider and need a key.

Clients will access the api via a websocket.
Clients need the url of the websocket (https://tiapi.quad9.net) and an authorization token.
Clients will access the api via a websocket. If a client has multiple threat lists they will be given a separate token for each list.

If a client has multiple threat lists they will be given a separate token for each list.

# Starting a Session

Clients initiate their session by making an HTTP GET request to the service. There is only one endpoint: "/". In this initial request, the client is expected to provide a "bearer" token in the `Authorization` header, like so:
## Requirements
Clients need:
- the url of the websocket (https://tiapi.quad9.net)
- an authorization token which can be requested at support@quad9.net

```http
HTTP/``1.1 GET /
Authorization: Token <YOUR_TOKEN>
```

When the client supplies a valid, active token, their connection will be "upgraded" to a Websocket.

## API Business Rules

Expand All @@ -31,12 +24,23 @@ threat-intel-api was written with the following business rules in place:
- Clients must acknowledge each message they receive;
- Clients must acknowledge messages in the order they are received within 5 seconds. (This can be adjusted with a config setting)
- Multiple clients connecting with the same authorization token is allowed and increases throughput;
- If a client fails to acknowledge the messages they receive, in-order, the API will terminate the connection.
- If a client acknowledges your messages and needs a reset, contact support@quad9.net and we can reset it to 0.
- There is currently no way to retrieve data by date. We are working on it.


# Starting a Session

Clients initiate their session by making an HTTP GET request to the service. There is only one endpoint: "/". In this initial request, the client is expected to provide a "bearer" token in the `Authorization` header, like so:

```http
HTTP/``1.1 GET /
Authorization: Token <YOUR_TOKEN>
```

If a client fails to acknowledge the messages they receive, in-order, the API will terminate the connection.
When the client supplies a valid, active token, their connection will be "upgraded" to a Websocket.

If you ack your messages and need a reset please contact support@quad9.net and we can reset you to 0.

There is currently no way for you to retrieve by date. We are working on it.

# Simple Curl Example

Expand All @@ -57,7 +61,7 @@ Dload Upload Total Spent Left Speed

Sends to a file called output.txt

Exmple single entry:
Example single entry:

```json
{
Expand All @@ -79,38 +83,68 @@ Exmple single entry:
- `region` = region that the query originated from, can be blank
- `country` = two character country code (https://en.wikipedia.org/wiki/ISO_3166-1_alpha-2) that the query originated from, can be blank

Note: We use MaxMind for geographic lookups but we snap locations to the center of the closest city that is above the minimum population - it never moves out of a region (typically a country or nation) but you will not get granular geographic information for small cities. This is to ensure end user privacy.
*_Note: We use MaxMind for geographic lookups but we snap locations to the center of the closest city that is above the minimum population - it never moves out of a region (typically a country or nation) but you will not get granular geographic information for small cities. This is to ensure end user privacy._*

# Python Example

The example script `tia_example.py` was written against Python version 3.6.
Convenient way to setup a 3.6 environment on Linux. https://linuxize.com/post/how-to-install-python-3-on-centos-7/
The example script `tia_example.py` was written against Python version 3.8.
Convenient way to set up a Python and virtual environment on Linux: https://linuxize.com/post/how-to-install-python-3-on-centos-7/


## Overview

This measures download speed:
The repository contains three files:
1. `requirements.txt` containing all required Python libraries

```
aiofile==3.8.1
environs==9.5.0
pyyaml==6.0
websockets==10.3
```

2. `config.yaml.example` - config file
```
ti_url: "wss://tiapi.quad9.net"
auth_token: ""
data_file: /output/stream.json
log_file: /output/tiapi.log
verbose: true
nolog: false
noack: false
```
- Replace `auth_token` with token received from support@quad9.net
- `data_file` - file to write the telemetry data
- `log_file` - file to write logs generated by script
- `verbose` - if set to true, print retrieved data on command line, default: true
- `nolog` - no logging enabled, default: false
- `noack` - do not acknowledge retrieved data, default: false

3. `tiapi.py`


## Usage

```shell
(my_project_venv) [exampleuser@commandline]\$ ./tia_example.py --auth_token <YOUR_TOKEN>
10000 8140.593486210115/sec
20000 9139.962665341003/sec
30000 9979.32015085066/sec
40000 10866.962743213264/sec
50000 10731.540519453485/sec
(my_project_venv) [exampleuser@commandline]\$ python3 ./tiapi.py --config path-to-config-file
```

To see the data being retrieved:
*_Replace `path-to-config-file`._*

### If verbose==true:
With `verbose` set to true, data will be printed to command line:

```shell
(my_project_venv) [exampleuser@commandline]\$ ./tia_example.py --auth_token <YOUR_TOKEN> --verbose
(my_project_venv) [exampleuser@commandline]\$ ./tia_example.py --config path-to-config-file
{'id': '191960005', 'qname': 'blockeddomain.example.com', 'qtype': 'A', 'timestamp': '2018-12-11T03:15:47.038932839Z', 'city': 'San Jose', 'region': 'CA', 'country': 'US'}
{'id': '191961005', 'qname': 'blockeddomain.example.com', 'qtype': 'A', 'timestamp': '2018-12-11T03:15:47.051392978Z', 'city': 'San Jose', 'region': 'CA', 'country': 'US'}
{'id': '191962005', 'qname': 'blockeddomain.example.com', 'qtype': 'A', 'timestamp': '2018-12-11T03:15:47.0605273Z', 'city': 'San Jose', 'region': 'CA', 'country': 'US'}
{'id': '191963005', 'qname': 'blockeddomain.example.com', 'qtype': 'A', 'timestamp': '2018-12-11T03:15:47.102118471Z', 'city': 'San Jose', 'region': 'CA', 'country': 'US'}
```

# Usage

Clients must acknowledge that messages have been received.

In the example above the script will send back to the server JSON objects of `{"id":'191960005'}` to indicate that is has successfully received and processed the JSON structure.
In the example above the script will send back to the server JSON objects of `{"id":'191960005'}` to indicate that it has successfully received and processed the JSON structure.

If the websocket is terminated before the ack is received by the server the message will be resent on the next connection.

7 changes: 7 additions & 0 deletions config.yaml.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
ti_url: "wss://tiapi.quad9.net"
auth_token: ""
data_file: /output/stream.json
log_file: /output/tiapi.log
verbose: true
nolog: false
noack: false
3 changes: 3 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
websockets==10.3
pyyaml==6.0
aiofile==3.8.1
135 changes: 71 additions & 64 deletions tia_example.py
Original file line number Diff line number Diff line change
@@ -1,110 +1,117 @@
#!/usr/bin/env python

#
# Connect to the Quad9 threat-intel api and receive domain block information
# You receive the auth_token from Quad9 and it is specific to a threat feed.
#
# Requires Python version 3.8 or greater.
#
# usage:
# ./tia_example.py --config <YOUR CONFIG FILE>
#
# set verbose: true to see the data being retrieved.

# @author: Emilia Cebrat-Maslowski (Quad9)

import os
import logging
import asyncio
import websockets
import json
import yaml
import sys
import os
import time
import argparse

#
# Connect to the Quad9 threat-intel api and receive domain block information
# You receive the auth_token from Quad9 and it is specific to a threat feed.
#
# Requires Python version 3.6 or greater
#
# usage:
# ./tia_example.py --auth_token <YOUR TOKEN>
# This measures download speed
#
# ./tia_example.py --verbose --auth_token <YOUR TOKEN>
# To see the data being retrieved.
from aiofile import async_open
from collections import namedtuple


def read_config(config_path):
with open(config_path, 'r') as f:
parsed_file = yaml.safe_load(f)
Config = namedtuple("Config", "ti_url auth_token data_file log_file verbose nolog noack")
config = Config(
parsed_file['ti_url'],
parsed_file['auth_token'],
parsed_file['data_file'],
parsed_file['log_file'],
parsed_file['verbose'],
parsed_file['nolog'],
parsed_file['noack']
)
return config

async def readblockloop():
async with websockets.connect(args.connect_url,
extra_headers={'Authorization': Token "YOUR TOKEN" + args.auth_token}) as ws:

async def readblockloop(config, events):

async with websockets.connect(config.ti_url,
extra_headers={'Authorization': "Token " + config.auth_token}) as ws:

global websocket
count = 0
start = time.perf_counter()

websocket = ws

while True:
try:
message = await websocket.recv()
#print(f" {message}")
data = json.loads(message)
if args.verbose:
print(f" {data}")

# We do our processing here. Just a count
count = count + 1
if (count % 10000 == 0):
end = time.perf_counter()
print(f' {count} {count/(end-start)}/sec')

ack = dict(id=data['id'])

if not args.noack:
await acks.put(ack)
#print(f" acks: {acks}")
if config.verbose:
print(f" {message}")

if not config.nolog:
await events.put(message)
except:
print('Failed to receive message')
break
logging.debug('Failed to receive message')
await asyncio.sleep(1)


async def process_acks():
async def process_acks(acks):
while True:
ack = await acks.get()

try:
logging.debug(f"ACKing: {ack}")
await send_data(ack)
except:
print('Failed to send ack')
logging.debug('Failed to send ack')
break


async def send_data(data):
frame = json.dumps(data)
await websocket.send(frame)

def main():
# Instantiate the parser
parser = argparse.ArgumentParser(description='Read from Quad9 threat-intel api')

parser.add_argument('--verbose', action='store_true',
help='Dump out received json')

parser.add_argument('--noack', action='store_true',
help='Disable acks so no data is confirmed read. Primarily for testing')

# Optional arguments
parser.add_argument('--auth_token', default="Token <YOUR TOKEN>",
help='Authorization token from quad9 to access the api')
async def process_events(config, events, acks):
while True:
async with async_open(config.data_file, "a") as f:
event = await events.get()
await f.write(event)
if not config.noack:
event_parsed = json.loads(event)
ack = dict(id=event_parsed['id'])
await acks.put(ack)

parser.add_argument('--connect_url', default='wss://tiapi.quad9.net',
help='url to access the api')

def main():

global args
parser = argparse.ArgumentParser(description='Read from Quad9 threat-intel api')
parser.add_argument('--config', required=True,
help='Path to the config file.')
args = parser.parse_args()



tasks = [
asyncio.ensure_future(readblockloop()),
asyncio.ensure_future(process_acks()),
]

global acks
config = read_config(args.config)
logging.basicConfig(filename=config.log_file, level=logging.INFO, format='%(message)s')
loop = asyncio.get_event_loop()
acks = asyncio.Queue()
events = asyncio.Queue()

try:
loop.create_task(readblockloop(config, events))
loop.create_task(process_events(config, events, acks))
loop.create_task(process_acks(acks))
loop.run_forever()
finally:
loop.close()

asyncio.get_event_loop().run_until_complete(asyncio.wait(tasks))


if __name__ == '__main__':
Expand Down

0 comments on commit ddabd24

Please sign in to comment.