Conditional PCAP logging¶
Concept¶
When turned on, conditional pcap logging will save on disk the network traffic seen by a Stamus Network Probe on network flows with alerts. The stored traffic includes the content that did trigger the initial alert and all the packets after it.
Conditional PCAP Activation¶
To activate conditional pcap extraction, go under Probe Management, menu Appliances.
Edit the desired Probe, or the desired Template, and go under the Settings tab.
Enable the checkbox “Activate pcap extraction on alerts”.
Finally, you need to Apply changes to make the changes take effect.
How Conditional PCAP logging works¶
When conditional pcap is activated, Stamus Network Probe will activate a global PCAP store (per probe) and it will start to write traffic to disk for all flows with at least an alert. The writing will start when the first alert on a flow is triggered and it will not stop till the end of the flow.
As there is only one global PCAP store, there is no duplication of storage if multiple alerts are firing. But this also means that the extraction has to be made from a PCAP that contains more than one flow. This last part is done transparently via the REST API and this allows the download of the PCAP to be done from a single link in the user interface.
Hint
Regular corporate 5Gbps traffic for a period of 7 days with full packet capture would require about 369TB of non redundant storage. Based on experience from deployments conditional pcap could result in about 90-100GB disk usage of de-duplicated storage or about ~0.024% of the full packet capture size.
Important
Conditional pcaps stored are automatically rotated based on total size limit for all.
Rest API pcap file extraction automation¶
Besides being available for download from the SCS GUI, a pcap file, containing the full sessions recording, can also be fetched via a Rest API call as shown in the example below.
To acquire the pcap for a network flow with alert, the corresponding complete alert json object will be required of this same alert. A full list of all alert object can be found with the following Rest API call below.
Hint
The name of the host (Probe) is needed to build the correct query.
Hint
For the pcap to be extracted an event type alert json object needs to be provided to the API.
GET request: https://<SCS ADDRESS>/rest/rules/es/alerts_tail/
After the alert json object is obtained, the next step is to upload it to the API. The response should be the name of the file generated.
POST request: https://<SCS ADDRESS>/rest/rules/filestore_pcap/upload/\?host\=<PROBE_NAME>
The next step is to extract the pcap file for this network flow.
POST request: https://<SCS ADDRESS>/rest/rules/filestore_pcap/<FILENAME RESPONSE FROM STEP 1>/extract_pcap/\?host\=<PROBE_NAME>
The final step is to retrieve and download the generated pcap file.
GET request: https://<SCS ADDRESS>/rest/rules/filestore_pcap/<FILENAME RESPONSE FROM STEP 1>/retrieve/?host=<PROBE_NAME>
GET request: https://<SCS ADDRESS>/rest/rules/filestore_pcap/<FILENAME RESPONSE FROM STEP 1>/download/
Hint
In order to get the required metadata to build the complete Rest API call correctly, the alerts rest API point can be used.
This can be achieved by filtering on unique key value pairs like the signature id and the flow id, for example.
https://<SCS ADDRESS>/rest/rules/es/alerts_tail/?qfilter=alert.signature_id%3A2029743%20AND%20flow_id%3A2077942411095108
The above specimen will filter all event_type: alerts` on {“signature_id”: 2029743} and {“flow_id”: 2077942411095108}
A complete python script implementation example is below.
import requests
from requests.packages.urllib3.exceptions import InsecureRequestWarning
import sys
import os
import json
def print_helptext():
if len(sys.argv) < 4:
print(
f"How to run the script: {sys.argv[0]} <hostname/ip of SCS> <token> <alert signature id> <alert flow id>"
)
print(
f"\nExample: python3 {sys.argv[0]} 192.168.0.12 7408a4b978abdc03ee39e1fea419512e5734f51e 2029743 2077942411095108"
)
quit()
else:
global url
url = "https://" + sys.argv[1] + "/rest"
global scs_ip
scs_ip = sys.argv[1]
global token
token = sys.argv[2]
global signature_id
signature_id = sys.argv[3]
global flow_id
flow_id = sys.argv[4]
def check_host_is_up(hostname, waittime=1000):
if (
os.system(
"ping -c 1 -W " + str(waittime) + " " + hostname + " > /dev/null 2>&1"
)
== 0
):
HOST_UP = True
else:
HOST_UP = False
raise Exception("Error. Host %s is not up..." % hostname)
return HOST_UP
def check_url_is_reachable(url):
try:
get = requests.get(url, verify=False)
if get.status_code == 200:
return f"{url}: is reachable"
else:
return f"{url}: is Not reachable, status_code: {get.status_code}"
except requests.exceptions.RequestException as e:
raise Exception(f"{url}: is Not reachable \nErr: {e}")
def check_request(response):
if response.status_code == 200 or 201:
print("Request is successful!")
print("Response:")
print(response.text)
else:
print(f"Request failed with status code: {response.status_code}")
print(response.text)
class RestCall:
def __init__(self, token, url):
self.token = token
self.url = url
self.headers = {"Authorization": f"Token {self.token}"}
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
verify = (
True # Change this to False if you use https with a self signed certificate
)
def get_alert_json(self, signature_id, flow_id):
print("INFO: Get the json alert")
rest_point = f"/rules/es/alerts_tail/?qfilter=alert.signature_id%3A{signature_id} AND flow_id%3A{flow_id}"
response = requests.get(
self.url + rest_point, headers=self.headers, verify=self.verify
)
check_request(response)
response = response.json()
results = response["results"][0]
with open("/tmp/myalert.json", "w") as file:
file.write(str(json.dumps(results)))
return results["host"]
def upload_json_alert_object(self, probe_name):
print("ACTION: Upload json alert to generate pcap file from network flow")
rest_point = f"/rules/filestore_pcap/upload/?host={probe_name}"
head, tail = os.path.split("/tmp/myalert.json")
files = {"file": (tail, open("/tmp/myalert.json", "rb"), "multipart/form-data")}
response = requests.post(
self.url + rest_point, headers=self.headers, verify=self.verify, files=files
)
check_request(response)
response = response.json()
return response["filename"]
def extract_pcap(self, pcap_filename, probe_name):
print("ACTION: Extract pcap file from network flow")
rest_point = f"/rules/filestore_pcap/{pcap_filename}/extract_pcap/?host={probe_name}"
response = requests.post(
self.url + rest_point, headers=self.headers, verify=self.verify
)
check_request(response)
response = response.json()
return response
def retrieve_pcap(self, pcap_filename, probe_name):
print("ACTION: Retrieve pcap file")
rest_point = f"/rules/filestore_pcap/{pcap_filename}/retrieve/?host={probe_name}"
response = requests.get(
self.url + rest_point, headers=self.headers, verify=self.verify
)
check_request(response)
response = response.json()
return response
def download(self, pcap_filename, local_pcap_filename):
print("ACTION: Download pcap file locally")
rest_point = f"/rules/filestore_pcap/{pcap_filename}/download/"
response = requests.get(
self.url + rest_point, headers=self.headers, verify=self.verify
)
totalbits = 0
with open(local_pcap_filename, 'wb') as f:
for chunk in response.iter_content(chunk_size=1024):
if chunk:
totalbits += 1024
print("Downloaded",totalbits*1025,"KB...")
f.write(chunk)
if __name__ == "__main__":
print_helptext()
SCS_Rest = RestCall(token, url)
check_url_is_reachable(url)
probe_name = SCS_Rest.get_alert_json(signature_id, flow_id)
pcap_filename = SCS_Rest.upload_json_alert_object(probe_name)
SCS_Rest.extract_pcap(pcap_filename, probe_name)
SCS_Rest.retrieve_pcap(pcap_filename, probe_name)
SCS_Rest.download(pcap_filename, "/tmp/myfile.pcap")