SOAR Integration Examples¶
This document provides examples of the API endpoints and sample queries that might be used to integrate Stamus Security Platform with a Security Orchestration, Automation, and Response (SOAR) system. The methodality covered in this integration guide includes:
Accessing the Host Insights data
Retrieving information for a specific host
Queries for NTA/NSM fields (non-alert based events)
Note, this is not an exhaustive set of examples. There are more than 4000 fields/keys available to the integrations.
SOAR Python example code¶
This section describes how to interact and query data from the Stamus Security Platform Rest API with the help of the Python programming language. The provided examples demonstrate various API operations including accessing different endpoints and performing query actions related to Host Insights, detection events, network metadata and logs.
Any of the examples below can be saved as a script and executed as follows:
python3 script.py IP Token
Substitute the IP
with the IP or hostname of the Stamus Central Server and Token
with the security access token. script.py
is utilized as a generic example name for a Python script.
RestAPI explained¶
Stamus Security Platform provides complete RestAPI coverage. Any search, filtering or action on the Stamus Central Server GUI can be reproduced via a RestAPI call. This section describes some RestAPI must knows that precede the actual examples.
Token¶
RestAPI calls require a token in order to ensure secure communication. For additional details on how to generate an access token, please consult the relevant documentation: Generate an Access Token.
Pagination¶
By default Stamus Central Server uses a pagination system with a default page size of 30 and starts on page 1. This default setting is reflected in the RestCall class (in the examples provided) where self.pagination is initialized as follows:
self.pagination = {"page_size": 30, "page_number": 1}
This means that, without any specific customization, API calls will return 30 items per page starting from the first page.
Customizing Pagination¶
If pagination adjustments are needed, it can be done the following way by customizing the number of items per page or the starting page. In the Python method, include optional parameters for tailoring pagination to specific requirements.
Tenant¶
Additionally take note the tenant parameter. The tenant parameter in the provided examples is not used by default and should only be used if multi-tenancy is enabled. In the case below tenant parameter with the value 4 is used as an example. Note that tenancy numbers are not assigned by default and they are specific and different in every deployment. For detailed instructions on activating multitenancy, please refer to the following documentation: Multiple Tenants
Date and time¶
When date and time is needed to be used in the RestAPI calls as a parameter it must be provided in Unix Timestamp format.
How to find a specific RestAPI call¶
Opening the browser’s developer console while using the Stamus Central Server GUI when querying, searching, filtering or doing a specific action is the easiest way to reveal the exact RestAPI call details and how it is structured:
Access to the browser developers console depends on the OS and the type of browser. Some reference is provided below:
Chrome - Cmd+Option+J (for Mac) or Ctrl+Shift+J (Windows, Linux, Chrome OS).
Firefox - Cmd+Option+K (on a Mac) or Ctrl+Shift+J (on Windows) or Ctrl+Shift+K (Linux).
Microsoft Edge - Ctrl+Shift+I
Safari - Cmd+Option+C
RestAPI call result review in a browser¶
Each RestAPI call can be tested in a browser in order to get aquatinted with the output and the structure of the response.
Once in the developers console, underneath Name
(in the example above), a double click will open up the actual RestAPI hyperlink for further review.
The example below retrieves Host Insights for a specific host/IP retrieved from the RestAPI hyperlink.
Note
Please note the use of the /rest/
point in the url below.
Other similar RestAPI points can be observed via https://<stamus.security.platform.ip>/rest/
.
RestAPI points¶
The Stamus Security Platform offer different RestAPI points depending on the data that is needed to be queried.
/appliances/host_id/
- this end point retrieves Host Insights data for any and all hosts./appliances/host_id_alerts/
- this end point retrieves Host Insights data for hosts that alerted. In other words those hosts were part of any alert method ("event_type":"alert"
),SIGHTINGS
or Stamus ("event_type":"stamus"
) event./rules/es/field_stats/
- this end point retrieves any and all alert based events - any alert method ("event_type":"alert"
),SIGHTINGS
or Stamus ("event_type":"stamus"
) event./rules/es/events_tail/
- this end point retrieves any and all network protocol events - ex: smb"event_type":"smb"
, tls"event_type":"tls"
, anomaly"event_type":"anomaly"
, ftp"event_type":"ftp"
, dns"event_type":"dns"
, file transaction ("event_type":"fileinfo"
) and flow ("event_type":"flow"
) and so on.
Naming Conventions used¶
The classes and methods in this documentation adhere to the naming conventions used across all tabs in the Stamus Hunting module available at https://<stamus.security.platform.ip>/stamus/hunting
.
Classes and method names are designed to align with the terminology and structure used in the Stamus Hunting module. When making API calls, the corresponding API method will have an identical name, making it easier to relate between the GUI interface and the RestAPI method names provided in this document.
The inherited class with naming conventions Alerts
and its method named dashboards
refer to queries that generate widget in Hunting > Dashboards GUI:
hunting = Alerts(token, url)
hunting.dashboards(dashboard="alert.metadata.mitre_technique_name", ip="10.6.15.119", alert=True, discovery=True)
The inherited class with naming conventions HostInsights
and its method named ip
refer to queries that return results in Hunting > Hosts GUI:
sn_host_insights = HostInsights(token, url)
sn_host_insights.ip(ip="10.9.29.134")
The example for the inherited class with naming conventions NSM
and methods with naming conventions nsm_metadata
refer to queries that return results from non alert events:
stamus = NSM(token, url)
stamus.nsm_metadata(field="ssh.client.software_version", field_value="OpenSSH_6.4")
# The example below uses a wild card to search for any OpenSSH ssh client versions
stamus.nsm_metadata(field="ssh.client.software_version", field_value=OpenSSH*)
Example of naming conventions used in Naming Conventions used RestAPI methods/methods as part of this documentation.
Automation code samples¶
The section below provides ready to use RestAPI examples and explanation of the classes and methods setup. Each class and its method(s) is explained with examples in the subsequent sections.
new_page_size and new_page_number are optional parameters, enabling customization of the pagination for the current API call. By default calls will return 30 items per page starting from page 1. To customize these defaults, include the relevant parameters in the method.
If no specific time range is selected from the Timespan drop-down, the default timespan
is set to 30 days
. The date
is passed to the request code as Unix Time stamp/epoch. To set a different time range, uncomment the date range parameters in the method and specify the desired time using Unix timestamp format.
import requests
from requests.packages.urllib3.exceptions import InsecureRequestWarning
import sys
import os
class RestCall:
def __init__(self, token, url):
self.token = token
self.url = url
self.headers = {"Authorization": f"Token {self.token}"}
# If multi-tenancy is enabled, uncomment the corresponding lines below and use the correct tenant value.
# self.tenancy = {"tenant": "4"}
# If customizations are needed for pagination and date/time range uncomment the the corresponding lines below and change to desired value.
# self.pagination = {"page_size": 30, "page_number": 1}
# self.date = {"from_date": "1700322592000", "to_date": "1702914592000"}
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
self.verify = False
class HostInsights(RestCall):
def ip(self, ip):
print("Returning Host Insights data for a specified host IP")
params = {}
rest_point = f"/appliances/host_id/{ip}"
# If multi-tenancy is enabled, uncomment the corresponding lines below nd use the correct tenant value.
# if self.tenancy:
# params["tenant"] = self.tenancy["tenant"]
# If customizations are needed for pagination and date/time range uncomment the the corresponding lines below and change to desired value.
# if self.pagination:
# params["page_size"] = self.pagination["page_size"]
# params["page_number"] = self.pagination["page_number"]
# if self.date:
# params["from_date"] = self.date["from_date"]
# params["to_date"] = self.date["to_date"]
# Construct URL with parameters
url_with_params = urllib.parse.urljoin(
self.url + rest_point,
"?" + urllib.parse.urlencode(params) if params else "",
)
full_url = urllib.parse.urljoin(self.url, url_with_params)
response = requests.get(full_url, headers=self.headers, params=params, verify=self.verify)
print(full_url)
check_request(response)
response_data = response.json()
return response_data
def alerted(self, host_id_qfilter=None):
print("Returning Host Insights data")
params = {}
rest_point = "/appliances/host_id_alerts/"
# If multi-tenancy is enabled, uncomment the lines below and use the correct tenant value.
# if self.tenancy:
# params["tenant"] = self.tenancy["tenant"]
# If customizations are needed for pagination and date/time range uncomment the the corresponding lines below and change to desired value.
# if self.pagination:
# params["page_size"] = self.pagination.get("page_size", 30)
# params["page"] = self.pagination.get("page_number", 1)
# if self.date and "from_date" in self.date:
# params["start_date"] = str(self.date["from_date"])
# if self.date and "to_date" in self.date:
# params["end_date"] = str(self.date["to_date"])
if host_id_qfilter:
params["host_id_qfilter"] = host_id_qfilter
encoded_params = urllib.parse.urlencode(params, quote_via=urllib.parse.quote_plus)
full_url = f"{self.url.rstrip('/')}/{rest_point}?{encoded_params}"
# Remove the trailing "?" if filter is None
if host_id_qfilter is None:
full_url = full_url.rstrip('?')
# Make the HTTP request
response = requests.get(full_url, headers=self.headers, verify=self.verify)
print(full_url)
check_request(response)
response_data = response.json()
return response_data
def all(self):
print("Returning data for all Hosts IPs from the SSP")
params = {}
rest_point = "/appliances/host_id/"
# If multi-tenancy is enabled, uncomment the lines below and use the correct tenant value.
# if self.tenancy:
# params["tenant"] = self.tenancy["tenant"]
# If customizations are needed for pagination and date/time range uncomment the the corresponding lines below and change to desired value.
# if self.pagination:
# params.update({
# "page_size": self.pagination["page_size"],
# "page_number": self.pagination["page_number"]
# })
# if self.date:
# params.update({
# "from_date": self.date["from_date"],
# "to_date": self.date["to_date"]
# })
# Construct URL with parameters
url_with_params = urllib.parse.urljoin(
self.url + rest_point,
"?" + urllib.parse.urlencode(params) if params else "",
)
full_url = urllib.parse.urljoin(self.url, url_with_params)
response = requests.get(full_url, headers=self.headers, params=params, verify=self.verify)
print(full_url)
check_request(response)
response_data = response.json()
return response_data
class NSM(RestCall):
def nsm_metadata(self, field, field_value=None):
print("Requesting metadata for NSM")
params = {}
if field_value is not None:
rest_point = f"/rules/es/events_tail/?qfilter={field}:{field_value}"
else:
rest_point = f"/rules/es/events_tail/?qfilter={field}"
# If multi-tenancy is enabled, uncomment corresponding lines below and use the correct tenant value.
# if self.tenancy:
# rest_point += f"&tenant={self.tenancy['tenant']}"
# If customizations are needed for pagination and date/time range uncomment the the corresponding lines below and change to desired value.
# if self.pagination:
# rest_point += f"&page_size={self.pagination['page_size']}&page_number={self.pagination['page_number']}"
# if self.date:
# rest_point += f"&from_date={self.date['from_date']}&to_date={self.date['to_date']}"
full_url = self.url + rest_point
response = requests.get(full_url, headers=self.headers, params=params, verify=self.verify)
print(full_url)
check_request(response) # Assuming check_request is defined elsewhere
response_data = response.json()
return response_data
class Alerts(RestCall):
def metadata(self, field, qfilter_field=None, qfilter_field_value=None):
print("Requesting metadata for a specific Host")
params = {}
if qfilter_field is not None:
rest_point = f"/rules/es/field_stats/?field={field}&qfilter={qfilter_field}:{qfilter_field_value}"
else:
rest_point = f"/rules/es/field_stats/?field={field}"
# If multi-tenancy is enabled, uncomment the corresponding lines below and use the correct tenant value.
# if self.tenancy:
# rest_point += f"&tenant={self.tenancy['tenant']}"
# If customizations are needed for pagination and date/time range uncomment the the corresponding lines below and change to desired value.
# if self.pagination:
# rest_point += f"&page_size={self.pagination['page_size']}&page_number={self.pagination['page_number']}"
# if self.date:
# rest_point += f"&from_date={self.date['from_date']}&to_date={self.date['to_date']}"
response = requests.get(
self.url + rest_point, headers=self.headers, params=params, verify=self.verify
)
print(self.url + rest_point)
check_request(response)
response_data = response.json()
return response_data
def dashboards(self, dashboard, ip=None, alert=True, discovery=True):
print(f"Requesting metadata from {dashboard} dashboard")
params = {}
rest_point = f"/rules/es/fields_stats/?fields={dashboard}&qfilter="
filter_conditions = []
if ip is not None:
filter_conditions.append(f"(src_ip:{ip} OR dest_ip:{ip})")
if alert:
filter_conditions.append(
'((NOT alert.tag:*) OR alert.tag:"informational" OR alert.tag:"relevant")'
)
if not discovery and alert:
filter_conditions.append("NOT _exists_:discovery")
elif discovery and not alert:
filter_conditions.append("_exists_:discovery")
query_filter = " AND ".join(filter_conditions)
# If customizations are needed for pagination and date/time range uncomment the the corresponding lines below and change to desired value.
# if self.pagination:
# query_filter += f"&page_size={self.pagination['page_size']}&page_number={self.pagination['page_number']}"
# if self.date:
# query_filter += f"&from_date={self.date['from_date']}&to_date={self.date['to_date']}"
# If tenancy is enabled, uncomment the lines below
# if self.tenancy:
# query_filter += f"&tenant={self.tenancy['tenant']}"
full_query = rest_point + urllib.parse.quote(query_filter)
response = requests.get(
self.url + full_query, headers=self.headers, params=params, verify=self.verify
)
print(self.url + full_query)
check_request(response)
response_data = response.json()
return response_data
if __name__ == "__main__":
# Replace 'token' and 'url' with actual values
token = 'token'
url = 'url'
# Instantiate an Alerts object with the given token and URL
sn_alerts = Alerts(token, url)
# Example of using the metadta method in Alerts class
field = 'required_field'
qfilter_field = 'required_qfilter_field' # Optional: Set to None if not using qfilter
qfilter_field_value = 'required_qfilter_value' # Optional: Set to None if not using qfilter
# Replace 'field', 'required_qfilter_field', and 'required_qfilter_value' with actual values
result = sn_alerts.metadata(field, qfilter_field, qfilter_field_value)
# Print the result
print(result)
Other examples can be:
sn_alerts.metadata(field="smb.filename", qfilter_field="dest_ip", qfilter_field_value="192.168.20.166")
sn_alerts.metadata(field="ssh.server.software_version", qfilter_field="src_ip", qfilter_field_value="181.129.104.139")
sn_alerts.metadata(field="host_id.username.user", qfilter_field="hostname_info.domain", qfilter_field_value="*.xyz")
More examples can be found here: Alerts Metadata
Host Insights¶
Host Insights provides details about various aspects of a host from a security perspective , including services, user agents, TLS fingerprints (JA3), hostnames, client services, roles, and more for all Host Insights IPs from the Stamus Security Platform. For more information please refer to the documentation of: Host Insights
The all
method is part of the inherited class HostInsights
and is using RestAPI point "/appliances/host_id/"
. It retrieves Host Insights for any host regardless if the host has been part of a detection/alert event. It also has an optional filter.
The method can be invoked with the optional parameter host_id_qfilter
, allowing to narrow down the results based on specific filter - for example type of client or network service present.
If no host ID filter is provided (host_id_qfilter=None
), the method retrieves Hosts Insights data for all hosts as part of the RestAPI point.
The given code establishes a class named HostInsights
, which inherits from the RestCall
class.
The example assumes that default settings for tenancy and pagination are employed, as specified in the class instantiation.
import requests
from requests.packages.urllib3.exceptions import InsecureRequestWarning
import sys
import os
import urllib.parse
class RestCall:
def __init__(self, token, url):
self.token = token
self.url = url
self.headers = {"Authorization": f"Token {self.token}"}
# If multi-tenancy is enabled, uncomment the corresponding lines below and use the correct tenant value.
# self.tenancy = {"tenant": "4"}
# If customizations are needed for pagination and date/time range uncomment the the corresponding lines below and change to desired value.
# self.pagination = {"page_size": 30, "page_number": 1}
# self.date = {"from_date": "1700322592000", "to_date": "1702914592000"}
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
self.verify = (False)
class HostInsights(RestCall):
def all()(self, host_id_qfilter=None):
print("Returning data for all Hosts IPs from the SSP")
params = {}
rest_point = "/appliances/host_id/"
# If multi-tenancy is enabled, uncomment the lines below
# if self.tenancy:
# params["tenant"] = self.tenancy["tenant"]
# If customizations are needed for pagination and date/time range uncomment the the corresponding lines below and change to desired value.
# if self.pagination:
# params.update({
# "page_size": self.pagination["page_size"],
# "page_number": self.pagination["page_number"]
# })
# if self.date:
# params.update({
# "from_date": self.date["from_date"],
# "to_date": self.date["to_date"]
# })
# Construct URL with parameters
if host_id_qfilter:
params["host_id_qfilter"] = host_id_qfilter
encoded_params = urllib.parse.urlencode(params, quote_via=urllib.parse.quote_plus)
full_url = f"{self.url.rstrip('/')}/{rest_point}?{encoded_params}"
# Remove the trailing "?" if filter is None
if host_id_qfilter is None:
full_url = full_url.rstrip('?')
# Make the HTTP request
response = requests.get(full_url, headers=self.headers, params=params, verify=self.verify)
print(full_url)
check_request(response)
response_data = response.json()
return response_data
if __name__ == "__main__":
sn_host_insights = HostInsights(token, url)
# return all Host Insights for any hosts that are serving FTP/FTP-DATA on the network
sn_host_insights.all(host_id_qfilter=host_id.services.values.app_proto:ftp*)
# return all Host Insights for any/all host
#sn_host_insights.all()
Host Insights for a specific Host - alert based¶
The alerted
method is part of the inherited class HostInsights
and is using RestAPI point "/appliances/host_id_alerts/"
. It retrieves Host Insights only for hosts that were part of any alert method ("event_type":"alert"
), SIGHTINGS
or Stamus ("event_type":"stamus"
) event. It also has an optional filter.
In the example, the method is invoked with the optional parameter host_id_qfilter
, allowing to narrow down the results based on specific filter - for example type of client or network service present.
If no host ID filter is provided (host_id_qfilter=None
), the method retrieves Hosts Insights data for all hosts as part of the RestAPI point.
The example assumes that default settings for tenancy and pagination are employed, as specified in the class instantiation.
class RestCall:
def __init__(self, token, url):
self.token = token
self.url = url
self.headers = {"Authorization": f"Token {self.token}"}
self.pagination = {"page_size": 30, "page_number": 1}
self.tenancy = {"tenant": "4"}
self.date = {"from_date": "1700322592000", "to_date": "1702914592000"}
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
self.verify = False
class HostInsights(RestCall):
def alerted(self, host_id_qfilter=None):
print("Returning Host Insights alert based data")
params = {}
rest_point = "/appliances/host_id_alerts/"
# If multi-tenancy is enabled, uncomment the lines below
# if self.tenancy:
# params["tenant"] = self.tenancy["tenant"]
# If customizations are needed for pagination and date/time range uncomment the the corresponding lines below and change to desired value.
# if self.pagination:
# params["page_size"] = self.pagination.get("page_size", 30)
# params["page"] = self.pagination.get("page_number", 1)
# if self.date and "from_date" in self.date:
# params["start_date"] = str(self.date["from_date"])
# if self.date and "to_date" in self.date:
# params["end_date"] = str(self.date["to_date"])
if host_id_qfilter:
params["host_id_qfilter"] = host_id_qfilter
encoded_params = urllib.parse.urlencode(params, quote_via=urllib.parse.quote_plus)
full_url = f"{self.url.rstrip('/')}/{rest_point}?{encoded_params}"
# Remove the trailing "?" if filter is None
if host_id_qfilter is None:
full_url = full_url.rstrip('?')
# Make the HTTP request
response = requests.get(full_url, headers=self.headers, params=params, verify=self.verify)
print(full_url)
check_request(response)
response_data = response.json()
return response_data
The first query retrieves Host Insights data without a specific filter.
if __name__ == "__main__":
sn_host_insights = HostInsights(token, url)
sn_host_insights.alerted()
The following queries are designed to retrieve Host Insights data based on specific application layer services.
The first query targets the krb5
service, and the second query focuses on the smtp
service.
sn_host_insights.alerted(host_id_qfilter="host_id.client_service.name.raw:"krb5"")
sn_host_insights.alerted(host_id_qfilter="host_id.client_service.name.raw:"smtp"")
The next query is formulated to retrieve Host Insights data based on a specific hostname, in this case, onedrive.live.com
sn_host_insights.alerted(host_id_qfilter="host_id.hostname.host.raw:"onedrive.live.com"")
The queries below are crafted to retrieve Host Insights data based on network services. The first query targets the DNS service over TCP, the second query focuses on all TCP services, and the third query is specific to the SMTP service.
sn_host_insights.alerted(host_id_qfilter="host_id.services.proto.raw:"tcp" AND host_id.services.values.app_proto.raw:"dns"")
sn_host_insights.alerted(host_id_qfilter="host_id.services.proto.raw:"tcp"")
sn_host_insights.alerted(host_id_qfilter="host_id.services.proto.raw:"smtp"")
The queries below are intended to retrieve Host Insights data based on specific usernames. For example this query targets the username julia.sanchez@thishouse.info
:
sn_host_insights.alerted(host_id_qfilter="host_id.username.user:"julia.sanchez@thishouse.info"")
Host Insights for a specific Host¶
The ip
method, which is part of the inherited class HostInsights
retrieves Host Insights data exclusively for a specified host IP. In the example below, the method is invoked with the IP address 172.16.1.2
to obtain Host Insights data for the corresponding host.
In all method examples below, it is assumed that the same class as introduced at the beginning of the section is utilized.
Specifically, only the method itself is added, and default settings for tenancy and pagination are employed.
class RestCall:
def __init__(self, token, url):
self.token = token
self.url = url
self.headers = {"Authorization": f"Token {self.token}"}
self.pagination = {"page_size": 30, "page_number": 1}
self.tenancy = {"tenant": "4"}
self.date = {"from_date": "1700322592000", "to_date": "1702914592000"}
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
self.verify = False
class HostInsights(RestCall):
def ip(self, ip):
print("Returning Host Insights data for a specified host IP")
params = {}
rest_point = f"/appliances/host_id/{ip}"
# If multi-tenancy is enabled, uncomment the corresponding lines below.
# if self.tenancy:
# params["tenant"] = self.tenancy["tenant"]
# If customizations are needed for pagination and date/time range uncomment the the corresponding lines below and change to desired value.
# if self.pagination:
# params["page_size"] = self.pagination["page_size"]
# params["page_number"] = self.pagination["page_number"]
# if self.date:
# params["from_date"] = self.date["from_date"]
# params["to_date"] = self.date["to_date"]
# Construct URL with parameters
url_with_params = urllib.parse.urljoin(
self.url + rest_point,
"?" + urllib.parse.urlencode(params) if params else "",
)
full_url = urllib.parse.urljoin(self.url, url_with_params)
response = requests.get(full_url, headers=self.headers, params=params, verify=self.verify)
print(full_url)
check_request(response)
response_data = response.json()
return response_data
if __name__ == "__main__":
sn_host_insights = HostInsights(token, url)
sn_host_insights.ip(ip="172.16.1.2")
Example output
The following represents a sample output obtained from the execution of the Host Insights ip
method.
The output is presented in JSON format for improved visibility.
python3 HostsIp.py 10.136.0.57 c5e8a89f34dcd2e64cc7a37c67b8d42eb736cd89
Returning Host Insights data for a specified host IP
https://10.136.0.57/rest/appliances/host_id/172.16.1.2
Request is successful!
Response:
{
"ip": "172.16.1.2",
"host_id": {
"first_seen": "2023-11-15T09:10:29.703450+01:00",
"last_seen": "2023-11-15T09:12:31.254863+01:00",
"hostname": [
{
"host": "electrohouse-dc.electrohouse.info",
"first_seen": "2023-11-15T09:10:29.703450+01:00",
"last_seen": "2023-11-15T09:11:09.872008+01:00"
}
],
"roles": [
{
"name": "dhcp",
"first_seen": "2023-11-15T09:10:30.849160+01:00",
"last_seen": "2023-11-15T09:12:16.434624+01:00"
},
{
"name": "domain controller",
"first_seen": "2023-11-15T09:10:35.406704+01:00",
"last_seen": "2023-11-15T09:12:16.434624+01:00"
}
],
"net_info": [
{
"agg": "winfarm.vmzone.servers.zerotrust.clients",
"first_seen": "2024-01-05T09:38:22.383659+01:00",
"last_seen": "2024-01-05T09:40:03.220592+01:00"
}
],
"tls.ja4": [
{
"agent": [
"Malware Test FP: trickbot-infection-from-usdata.estoreseller.com, malspam-infection-traffic"
],
"hash": "t10i120300_d94e65cdb899_5f12c91e28fe",
"first_seen": "2024-01-05T09:38:41.675847+01:00",
"last_seen": "2024-01-05T09:39:41.509173+01:00"
},
{
"agent": [
"Non-Specific Microsoft Socket, Malware Test FP: brazil-malspam-pushes-banload, dhl-malspam-traffic"
],
"hash": "t10d120400_d94e65cdb899_5f12c91e28fe",
"first_seen": "2024-01-05T09:38:41.733641+01:00",
"last_seen": "2024-01-05T09:38:41.733641+01:00"
},
{
"hash": "t13d591000_a33745022dd6_5ac7197df9d2",
"first_seen": "2024-01-05T09:39:16.122499+01:00",
"last_seen": "2024-01-05T09:40:00.195837+01:00"
},
{
"hash": "t12d1311h2_8b80da21ef18_77989cba1f4a",
"first_seen": "2024-01-05T09:40:03.253613+01:00",
"last_seen": "2024-01-05T09:40:03.253613+01:00"
},
{
"hash": "t12d1312h2_8b80da21ef18_b00751acaffa",
"first_seen": "2024-01-05T09:40:03.329172+01:00",
"last_seen": "2024-01-05T09:40:03.329172+01:00"
}
],
"http.user_agent": [
{
"agent": "Microsoft-CryptoAPI/6.1",
"first_seen": "2023-11-15T09:11:20.772005+01:00",
"last_seen": "2023-11-15T09:11:20.900576+01:00"
},
{
"agent": "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 6.1; Win64; x64; Trident/7.0; .NET CLR 2.0.50727; SLCC2; .NET CLR 3.5.30729; .NET CLR 3.0.30729; .NET4.0C; .NET4.0E)",
"first_seen": "2023-11-15T09:11:46.114871+01:00",
"last_seen": "2023-11-15T09:11:46.617722+01:00"
},
{
"agent": "KSKJJGJ",
"first_seen": "2023-11-15T09:11:47.449151+01:00",
"last_seen": "2023-11-15T09:11:47.449151+01:00"
},
{
"agent": "Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; WOW64; Trident/5.0)",
"first_seen": "2023-11-15T09:11:58.923444+01:00",
"last_seen": "2023-11-15T09:11:58.923444+01:00"
},
{
"agent": "Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.0; Trident/5.0; BOIE9;ENUSMSNIP)",
"first_seen": "2023-11-15T09:11:58.952009+01:00",
"last_seen": "2023-11-15T09:12:29.469149+01:00"
},
{
"agent": "Mozilla/5.0 (Windows NT 6.1; Win64; x64; Trident/7.0; rv:11.0) like Gecko",
"first_seen": "2023-11-15T09:12:21.577724+01:00",
"last_seen": "2023-11-15T09:12:21.577724+01:00"
},
{
"agent": "WinHTTP loader/1.0",
"first_seen": "2023-11-15T09:12:31.254863+01:00",
"last_seen": "2023-11-15T09:12:31.254863+01:00"
}
],
"client_service": [
{
"name": "tls",
"first_seen": "2023-11-15T09:11:31.282838+01:00",
"last_seen": "2023-11-15T09:12:01.580981+01:00"
},
{
"name": "http",
"first_seen": "2023-11-15T09:11:32.626941+01:00",
"last_seen": "2023-11-15T09:12:04.267485+01:00"
}
],
"hostname_count": 1,
"roles_count": 2,
"net_info_count": 1,
"tls.ja4_count": 5,
"http.user_agent_count": 7,
"client_service_count": 2,
"services": [
{
"proto": "udp",
"port": 53,
"values": [
{
"first_seen": "2023-11-15T09:10:29.703450+0100",
"last_seen": "2023-11-15T09:11:50.874870+0100",
"app_proto": "dns"
}
]
},
{
"proto": "udp",
"port": 67,
"values": [
{
"first_seen": "2023-11-15T09:10:30.849160+0100",
"last_seen": "2023-11-15T09:11:11.412015+0100",
"app_proto": "dhcp"
}
]
},
{
"proto": "tcp",
"port": 88,
"values": [
{
"first_seen": "2023-11-15T09:10:34.103294+0100",
"last_seen": "2023-11-15T09:12:15.079934+0100",
"app_proto": "krb5"
}
]
},
{
"proto": "tcp",
"port": 135,
"values": [
{
"first_seen": "2023-11-15T09:10:40.132512+0100",
"last_seen": "2023-11-15T09:12:08.333176+0100",
"app_proto": "dcerpc"
}
]
},
{
"proto": "tcp",
"port": 389,
"values": [
{
"first_seen": "2023-11-15T09:10:35.262645+0100",
"last_seen": "2023-11-15T09:12:16.434624+0100",
"app_proto": "unknown"
}
]
},
{
"proto": "tcp",
"port": 445,
"values": [
{
"first_seen": "2023-11-15T09:10:35.406704+0100",
"last_seen": "2023-11-15T09:12:12.386575+0100",
"app_proto": "smb"
},
{
"first_seen": "2023-11-15T09:11:19.150556+0100",
"last_seen": "2023-11-15T09:11:34.660474+0100",
"app_proto": "unknown"
}
]
},
{
"proto": "tcp",
"port": 3268,
"values": [
{
"first_seen": "2023-11-15T09:11:22.515553+0100",
"last_seen": "2023-11-15T09:11:28.578870+0100",
"app_proto": "unknown"
}
]
},
{
"proto": "tcp",
"port": 49155,
"values": [
{
"first_seen": "2023-11-15T09:10:45.143113+0100",
"last_seen": "2023-11-15T09:12:05.615669+0100",
"app_proto": "dcerpc"
}
]
},
{
"proto": "tcp",
"port": 49158,
"values": [
{
"first_seen": "2023-11-15T09:10:49.539029+0100",
"last_seen": "2023-11-15T09:12:06.299000+0100",
"app_proto": "dcerpc"
}
]
}
],
"services_count": 10
}
}
Alerts Metadata¶
The metadata
method, which is part of the inherited class Alerts
returns detection’s method metadata information. It has two parameters - field
stats per field, as well as a qfilter
- it could filter on any field present.
class Alerts(RestCall):
def metadata(self, field, qfilter_field=None, qfilter_field_value=None):
print("Requesting metadata for a specific Host")
params = {}
if qfilter_field is not None:
rest_point = f"/rules/es/field_stats/?field={field}&qfilter={qfilter_field}:{qfilter_field_value}"
else:
rest_point = f"/rules/es/field_stats/?field={field}"
# If multi-tenancy is enabled, uncomment the corresponding lines below.
# if self.tenancy:
# rest_point += f"&tenant={self.tenancy['tenant']}"
# If customizations are needed for pagination and date/time range uncomment the the corresponding lines below and change to desired value.
# if self.pagination:
# rest_point += f"&page_size={self.pagination['page_size']}&page_number={self.pagination['page_number']}"
# if self.date:
# rest_point += f"&from_date={self.date['from_date']}&to_date={self.date['to_date']}"
response = requests.get(
self.url + rest_point, headers=self.headers, params=params, verify=self.verify
)
print(self.url + rest_point)
check_request(response)
response_data = response.json()
return response_data
In this example the method will return all affected_product
fields for any detection methods logs where the source IP address is 192.168.5.125
.
if __name__ == "__main__":
sn_alerts = Alerts(token, url)
sn_alerts.metadata(field="alert.metadata.affected_product", qfilter_field="src_ip", qfilter_field_value="192.168.5.125")
sn_alerts.metadata(field="tls.ja4.hash", qfilter_field="tls.cipher_security", qfilter_field_value="insecure")
Example output
The examples below queries the SSP RestAPI for an affected_product
metadata of all detection events from source IP 192.168.5.125”
python3 HostsIp.py 10.136.0.57 4124ae5a554b1943aa3f7053f2ce2766eede5440
Requesting metadata for a specific Host
https://10.136.0.57/rest/rules/es/field_stats/?field=alert.metadata.affected_product&qfilter=src_ip:192.168.5.125
Request is successful!
Response:
[{"key":"Windows_XP_Vista_7_8_10_Server_32_64_Bit","doc_count":1924},{"key":"Any","doc_count":5}]
Some other examples can be:
if __name__ == "__main__":
sn_alerts = Alerts(token, url)
sn_alerts.metadata(field="alert.metadata.affected_product", qfilter_field="src_ip", qfilter_field_value="192.168.5.125")
sn_alerts.metadata(field="alert.metadata.attack_target", qfilter_field="src_ip", qfilter_field_value="192.168.5.125")
sn_alerts.metadata(field="alert.metadata.malware_family", qfilter_field="src_ip", qfilter_field_value="192.168.5.125")
In the example below, the metadata
method that return all alert.metadata.mitre_tactic_id
fields that are filtered based on qfilter_field
- src_ip
with qfilter_field_value
-192.168.5.125 OR 10.11.21.101
. In other words it will list all mitre_tactic_id
fields where the source ip is either 192.168.5.125
or 10.11.21.101
sn_alerts.metadata(field="alert.metadata.mitre_tactic_id", qfilter_field="src_ip", qfilter_field_value="192.168.5.125 OR 10.11.21.101")
Example output
python3 HostsIp.py 10.136.0.57 4124ae5a554b1943aa3f7053f2ce2766eede5440
Requesting metadata for a specific Hosts
https://10.136.0.57/rest/rules/es/field_stats/?field=alert.metadata.mitre_tactic_id&qfilter=src_ip:192.168.5.125 OR 10.11.21.101
Request is successful!
Response:
[{"key":"TA0011","doc_count":1884}]
In another example below, the metadata
method returns all alert.signature
fields (signatures) that are filtered based on qfilter_field
- tls.cipher_security
with qfilter_field_value
-insecure
. In other words it will list all detection methods/signatures that generated alerts where the TLS cipher is insecure
sn_alerts.metadata(field="alert.signature", qfilter_field="tls.cipher_security", qfilter_field_value="insecure")
# The example below is same as above but also filtering on a wild carded TLS SNI ending on kmofou123.com
sn_alerts.metadata(field="alert.signature", qfilter_field="tls.cipher_security", qfilter_field_value="insecure AND tls.sni:*kmofou123.com")
In another example below, the metadata
method returns all tls.ja4.hash
fields that are filtered based on qfilter_field
- tls.cipher_security
with qfilter_field_value
-insecure
and coming from any hosts that are in the network definition of accounting.site-a.remotevpn.clients
. In other words it will list all ja4 hashes that generated alerts where the TLS cipher is insecure and has been communication to or from remote VPN clients in accounting.
sn_alerts.metadata(field="tls.ja4.hash", qfilter_field="tls.cipher_security", qfilter_field_value="insecure AND alert.target.net_info_agg:accounting.site-a.remotevpn.clients")
Hint
The metadata
method can use any and all fields from the following event types’ Stamus , Alerts, SIGHTINGS Data fields.
Hunting Dashboards¶
The dashboards
method, which is part of the inherited class Alerts
retrieves activity data for a specific dashboard widget, with the dashboard metadata type specified as a parameter.
Various filters including IP, network definitions, alert metadata, sightings and others can be applied. To enable sightings, set the discover parameter to true.
Similar principles apply to other filters. By default, the filters for Informational , Relevant and Untagged are active.
The below examples follow the Dashboard widgets namings from the Hunting page of the Stamus Central Server. The alert and discovery switches in each method mimic the toggle on/off of the Alert or SIGHTINGS switches respectively in the GUI.
The dashboard parameter can be the widget name of any of the widgets (boxes) in the Dashboards. For example, the methods:
class Alerts(RestCall):
def dashboards(self, dashboard, ip=None, alert=True, discovery=True):
print(f"Requesting metadata from {dashboard} dashboard")
params = {}
rest_point = f"/rules/es/fields_stats/?fields={dashboard}&qfilter="
filter_conditions = []
if ip is not None:
filter_conditions.append(f"(src_ip:{ip} OR dest_ip:{ip})")
if alert:
filter_conditions.append(
'((NOT alert.tag:*) OR alert.tag:"informational" OR alert.tag:"relevant")'
)
if not discovery and alert:
filter_conditions.append("NOT _exists_:discovery")
elif discovery and not alert:
filter_conditions.append("_exists_:discovery")
query_filter = " AND ".join(filter_conditions)
# If customizations are needed for pagination and date/time range uncomment the the corresponding lines below and change to desired value.
# if self.pagination:
# query_filter += f"&page_size={self.pagination['page_size']}&page_number={self.pagination['page_number']}"
# if self.date:
# query_filter += f"&from_date={self.date['from_date']}&to_date={self.date['to_date']}"
# If tenancy is enabled, uncomment the lines below
# if self.tenancy:
# query_filter += f"&tenant={self.tenancy['tenant']}"
full_query = rest_point + urllib.parse.quote(query_filter)
response = requests.get(
self.url + full_query, headers=self.headers, params=params, verify=self.verify
)
print(self.url + full_query)
check_request(response)
response_data = response.json()
return response_data
if __name__ == "__main__":
hunting = Alerts(token, url)
hunting.dashboards(dashboard="tls.sni", alert=True, discovery=True)
hunting.dashboards(dashboard="tls.subject", alert=True, discovery=True)
hunting.dashboards(dashboard="tls.issuerdn", alert=True, discovery=True)
query the TLS metadata displayed in the following 3 widgets respectively, in the Hunting > Dashboards GUI:
As an additional examples some methods below also display usage of a filter for a specific IP (either source or destination) via the ip=
parameter. Like in the examples below:
if __name__ == "__main__":
hunting = Alerts(token, url)
hunting.dashboards(dashboard="tls.ja4.hash", ip="10.6.15.119", alert=True, discovery=True)
hunting.dashboards(dashboard="tls.cipher_security", ip="10.6.15.119", alert=True, discovery=True)
Below is a comprehensive list of examples associated with the fetching method.
if __name__ == "__main__":
hunting = Alerts(token, url)
hunting.dashboards(dashboard="alert.signature", ip="10.6.15.119", alert=True, discovery=True)
hunting.dashboards(dashboard="alert.signature", alert=True, discovery=True)
hunting.dashboards(dashboard="tls.ja4.hash", ip="10.6.15.119", alert=True, discovery=True)
Below is a comprehensive list of examples covering every widget in Hunting > Dashboards GUI:
hunting.dashboards(dashboard="alert.category", alert=True, discovery=False)
hunting.dashboards(dashboard="alert.severity", alert=False, discovery=True)
hunting.dashboards(dashboard="alert.metadata.signature_severity", alert=True, discovery=True)
hunting.dashboards(dashboard="alert.metadata.attack_target", alert=True, discovery=True)
hunting.dashboards(dashboard="alert.metadata.affected_product", alert=True, discovery=True)
hunting.dashboards(dashboard="alert.metadata.malware_family", alert=True, discovery=True)
hunting.dashboards(dashboard="alert.metadata.mitre_tactic_id", alert=True, discovery=False)
hunting.dashboards(dashboard="alert.metadata.mitre_tactic_name", alert=True, discovery=False)
hunting.dashboards(dashboard="alert.metadata.mitre_technique_id", alert=True, discovery=False)
hunting.dashboards(dashboard="alert.metadata.mitre_technique_name", alert=True, discovery=False)
hunting.dashboards(dashboard="alert.source.ip", alert=True, discovery=False)
hunting.dashboards(dashboard="alert.target.ip", alert=True, discovery=False)
#
hunting.dashboards(dashboard="hostname_info.subdomain", alert=True, discovery=False)
hunting.dashboards(dashboard="hostname_info.domain", alert=True, discovery=False)
hunting.dashboards(dashboard="hostname_info.tld", alert=True, discovery=False)
hunting.dashboards(dashboard="hostname_info.domain_without_tld", alert=True, discovery=False)
hunting.dashboards(dashboard="hostname_info.host", alert=True, discovery=False)
#
hunting.dashboards(dashboard="geoip.country_name", alert=True, discovery=False)
hunting.dashboards(dashboard="geoip.city_name",alert=True, discovery=False)
#
hunting.dashboards(dashboard="alert.lateral", alert=True, discovery=False)
hunting.dashboards(dashboard="alert.source.net_info_agg", alert=True, discovery=False)
hunting.dashboards(dashboard="alert.target.net_info_agg", alert=True, discovery=False)
#
hunting.dashboards(dashboard="fqdn.src", alert=True, discovery=False)
hunting.dashboards(dashboard="fqdn.dest", alert=True, discovery=False)
hunting.dashboards(dashboard="geoip.provider.autonomous_system_number", alert=True, discovery=False)
hunting.dashboards(dashboard="geoip.provider.autonomous_system_organization",alert=True,discovery=False)
#
hunting.dashboards(dashboard="src_ip",alert=True, discovery=False)
hunting.dashboards(dashboard="dest_ip", alert=True, discovery=False)
hunting.dashboards(dashboard="src_port", alert=True, discovery=False)
hunting.dashboards(dashboard="dest_port", alert=True, discovery=False)
hunting.dashboards(dashboard="geoip.provider.autonomous_system_organization",alert=True,discovery=False,)
hunting.dashboards(dashboard="vlan", alert=True, discovery=False)
hunting.dashboards(dashboard="tunnel.src_ip", alert=True, discovery=False)
hunting.dashboards(dashboard="tunnel.dest_ip", alert=True, discovery=False)
hunting.dashboards(dashboard="tunnel.proto", alert=True, discovery=False)
hunting.dashboards(dashboard="tunnel.depth", alert=True,discovery=False)
#
hunting.dashboards(dashboard="http.hostname", alert=True, discovery=False)
hunting.dashboards(dashboard="http.url", alert=True, discovery=False)
hunting.dashboards(dashboard="http.status", alert=True, discovery=False)
hunting.dashboards(dashboard="http.http_user_agent", alert=True, discovery=False)
hunting.dashboards(dashboard="http.http_refer", alert=True, discovery=False)
hunting.dashboards(dashboard="http.http_refer", alert=True, discovery=False)
hunting.dashboards(dashboard="http.http_refer_info.subdomain", alert=True, discovery=False)
hunting.dashboards(dashboard="http.http_refer_info.resource_path", alert=True, discovery=False)
hunting.dashboards(dashboard="http.http_refer_info.domain", alert=True, discovery=False)
hunting.dashboards(dashboard="http.http_refer_info.scheme", alert=True, discovery=False)
hunting.dashboards(dashboard="http.http_refer_info.tld", alert=True, discovery=False)
hunting.dashboards(dashboard="http.http_refer_info.domain_without_tld", alert=True, discovery=False)
#
hunting.dashboards(dashboard="dns.query.rrname", alert=True, discovery=False)
hunting.dashboards(dashboard="dns.query.rrtype", alert=True, discovery=False)
#
hunting.dashboards(dashboard="tls.sni", alert=True, discovery=False)
hunting.dashboards(dashboard="tls.subject", alert=True, discovery=False)
hunting.dashboards(dashboard="tls.issuerdn", alert=True, discovery=False)
hunting.dashboards(dashboard="tls.fingerprint", alert=True, discovery=False)
hunting.dashboards(dashboard="tls.ja3.hash", alert=True, discovery=False)
hunting.dashboards(dashboard="tls.ja3.agent", alert=True, discovery=False)
hunting.dashboards(dashboard="tls.ja3s.hash", alert=True, discovery=False)
hunting.dashboards(dashboard="tls.cipher_suite", alert=True, discovery=False)
hunting.dashboards(dashboard="tls.cipher_security", alert=True, discovery=False)
#
hunting.dashboards(dashboard="smtp.mail_from", alert=True, discovery=False)
hunting.dashboards(dashboard="smtp.rcpt_to", alert=True, discovery=False)
hunting.dashboards(dashboard="smtp.helo", alert=True, discovery=False)
#
hunting.dashboards(dashboard="smb.command", alert=True,discovery=False)
hunting.dashboards(dashboard="smb.status", alert=True, discovery=False)
hunting.dashboards(dashboard="smb.filename", alert=True, discovery=False)
hunting.dashboards(dashboard="smb.share", alert=True, discovery=False)
hunting.dashboards(dashboard="smb.dcerpc.interface.name", alert=True, discovery=False)
hunting.dashboards(dashboard="smb.dcerpc.endpoint", alert=True, discovery=False)
#
hunting.dashboards(dashboard="ssh.client.software_version", alert=True, discovery=False)
hunting.dashboards(dashboard="ssh.server.software_version", alert=True, discovery=False)
NTA/NSM¶
The nsm_metadata
method, which is part of the inherited class NSM
returns results for Network Traffic Analysis (NTA) and Network Security Monitoring (NSM) events - in other words non alert events such as protocol, file transaction, anomaly and flow logs. It has two parameters - field
and field_value
. The field can be any of the data keys produced by the Stamus Security Platform - Data fields
class NSM(RestCall):
def nsm_metadata(self, field, field_value=None):
print("Requesting metadata for NSM")
params = {}
if field_value is not None:
rest_point = f"/rules/es/events_tail/?qfilter={field}:{field_value}"
else:
rest_point = f"/rules/es/events_tail/?qfilter={field}"
# If multi-tenancy is enabled, uncomment corresponding lines below.
# if self.tenancy:
# rest_point += f"&tenant={self.tenancy['tenant']}"
# If customizations are needed for pagination and date/time range uncomment the the corresponding lines below and change to desired value.
# if self.pagination:
# rest_point += f"&page_size={self.pagination['page_size']}&page_number={self.pagination['page_number']}"
# if self.date:
# rest_point += f"&from_date={self.date['from_date']}&to_date={self.date['to_date']}"
full_url = self.url + rest_point
response = requests.get(full_url, headers=self.headers, params=params, verify=self.verify)
print(full_url)
check_request(response) # Assuming check_request is defined elsewhere
response_data = response.json()
return response_data
if __name__ == "__main__":
stamus = NSM(token, url)
# Some example method calls:
stamus.nsm_metadata(field="http.hostname", field_value="onedrive.live.com")
stamus.nsm_metadata(field="http.http_user_agent", field_value="Windows")
#
stamus.nsm_metadata(field="tls.ja3s.hash", field_value="623de93db17d313345d7ea481e7443cf")
stamus.nsm_metadata(field="tls.ja3.hash", field_value="bafc6b01eae6f4350f5db6805ace208e")
stamus.nsm_metadata(field="tls.sni", field_value="decretery.host")
stamus.nsm_metadata(field="tls.issuerdn", field_value="CN=www.wehpsikted.com")
stamus.nsm_metadata(field="tls.subject", field_value="CN=www.b2zhaqdqh2on.net")
stamus.nsm_metadata(field="tls.fingerprint", field_value="dd:eb:4a:36:6a:2b:50:da:5f:b5:db:07:55:9a:92:b0:a3:52:5c:ad")
stamus.nsm_metadata(field="tls.cipher_security", field_value="insecure")
stamus.nsm_metadata(field="tls.cipher_suite", field_value="TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA")
#
stamus.nsm_metadata(field="dns.query.rrname", field_value="t23bendarron.top")
#
stamus.nsm_metadata(field="krb5.cname", field_value="rudolph.wilkins")
stamus.nsm_metadata(field="krb5.sname", field_value="krbtgt/AGERAS.LOCAL")
#
stamus.nsm_metadata(field="ssh.client.software_version", field_value="libssh-0.1")
stamus.nsm_metadata(field="ssh.client.software_version", field_value="OpenSSH_6.4")
#
stamus.nsm_metadata(field="smb.filename", field_value="windows\gemp\p.exe")
# Wildcards can also be used
stamus.nsm_metadata(field="smb.filename", field_value="*p.exe")