Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/docker-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ concurrency:

jobs:
docker-test:
runs-on: self-hosted
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Python
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ concurrency:

jobs:
test:
runs-on: self-hosted
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
Expand Down
3 changes: 2 additions & 1 deletion bbot_server/api/mcp.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import logging
from fastapi_mcp import FastApiMCP

MCP_ENDPOINTS = {}

log = logging.getLogger("bbot_server.api.mcp")


def make_mcp_server(fastapi_app, config, mcp_endpoints=None):
from fastapi_mcp import FastApiMCP

if mcp_endpoints is None:
mcp_endpoints = MCP_ENDPOINTS
log.debug(f"Creating MCP server with endpoints: {','.join(mcp_endpoints)}")
Expand Down
2 changes: 1 addition & 1 deletion bbot_server/applets/_routing.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@
from contextlib import suppress
from fastapi.responses import StreamingResponse
from starlette.websockets import WebSocketDisconnect

import bbot_server.config as bbcfg
from bbot_server.api.mcp import MCP_ENDPOINTS
from bbot_server.utils.misc import smart_encode


log = logging.getLogger("bbot_server.applets.routing")

ROUTE_TYPES = {}
Expand Down
111 changes: 109 additions & 2 deletions bbot_server/applets/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@
from bbot_server.assets import Asset
from bbot.models.pydantic import Event
from bbot_server.modules import API_MODULES
from bbot_server.errors import BBOTServerError
from bbot.core.helpers import misc as bbot_misc
from bbot_server.applets._routing import ROUTE_TYPES
from bbot_server.utils import misc as bbot_server_misc
from bbot_server.modules.activity.activity_models import Activity
from bbot_server.errors import BBOTServerError, BBOTServerValueError
from bbot_server.utils.misc import _sanitize_mongo_query, _sanitize_mongo_aggregation

word_regex = re.compile(r"\W+")


log = logging.getLogger(__name__)


Expand Down Expand Up @@ -380,6 +380,113 @@ async def _emit_activity(self, activity: Activity):
self.log.info(f"Emitting activity: {activity.type} - {activity.description}")
await self.root.message_queue.publish_asset(activity)

async def make_bbot_query(
self,
query: dict = None,
search: str = None,
host: str = None,
domain: str = None,
type: str = None,
target_id: str = None,
archived: bool = False,
active: bool = True,
):
"""
Streamlines querying of a Mongo collection with BBOT-specific filters like "host", "reverse_host", etc.

This is meant to be a base method with only query logic common to all collections in BBOT server.

For any additional custom logic like different default kwarg values, etc., override this method on applet-by-applet basis.

Example:
async def make_bbot_query(self, type: str = "Asset", query: dict = None, ignored: bool = False, **kwargs):
query = dict(query or {})
if ignored is not None and "ignored" not in query:
query["ignored"] = ignored
return await super().make_bbot_query(type=type, query=query, **kwargs)
"""
query = dict(query or {})
# AI is dumb and likes to pass in blank strings for stuff
domain = domain or None
target_id = target_id or None
type = type or None
host = host or None
search = search or None

if ("type" not in query) and (type is not None):
query["type"] = type
if ("host" not in query) and (host is not None):
query["host"] = host
if ("reverse_host" not in query) and (domain is not None):
reversed_host = domain[::-1]
# Match exact domain or subdomains (with dot separator)
query["reverse_host"] = {"$regex": f"^{reversed_host}(\\.|$)"}
if ("$text" not in query) and (search is not None):
query["$text"] = {"$search": search}

if ("scope" not in query) and (target_id is not None):
target_query_kwargs = {}
if target_id != "DEFAULT":
target_query_kwargs["id"] = target_id
target = await self.root.targets._get_target(**target_query_kwargs, fields=["id"])
query["scope"] = target["id"]

# if both active and archived are true, we don't need to filter anything, because we are returning all assets
if not (active and archived) and ("archived" not in query):
# if both are false, we need to raise an error
if not (active or archived):
raise BBOTServerValueError("Must query at least one of active or archived")
# only one should be true
query["archived"] = {"$eq": archived}

return _sanitize_mongo_query(query)

async def mongo_iter(
self,
query: dict = None,
aggregate: list[dict] = None,
sort: list[str | tuple[str, int]] = None,
fields: list[str] = None,
limit: int = None,
**kwargs,
):
"""
Lazy iterator over a Mongo collection with BBOT-specific filters and aggregation
"""
query = await self.make_bbot_query(query=query, **kwargs)
fields = {f: 1 for f in fields} if fields else None

if self.collection is None:
raise BBOTServerError(f"Collection is not set for {self.name}")

log.info(f"Querying {self.collection.name}: query={query}, fields={fields}")

if aggregate is not None:
# sanitize aggregation pipeline
aggregate = _sanitize_mongo_aggregation(aggregate)
aggregate_pipeline = [{"$match": query}] + aggregate
if limit is not None:
aggregate_pipeline.append({"$limit": limit})
log.info(f"Querying {self.collection.name}: aggregate={aggregate_pipeline}")
cursor = await self.collection.aggregate(aggregate_pipeline)
async for agg in cursor:
yield agg
else:
cursor = self.collection.find(query, fields)
if sort:
processed_sort = []
for field in sort:
if isinstance(field, str):
processed_sort.append((field.lstrip("+-"), -1 if field.startswith("-") else 1))
else:
# assume it's already a tuple (field, direction)
processed_sort.append(tuple(field))
cursor = cursor.sort(processed_sort)
if limit is not None:
cursor = cursor.limit(limit)
async for asset in cursor:
yield asset

def include_app(self, app_class):
self.log.debug(f"{self.name_lowercase} including applet {app_class.name_lowercase}")

Expand Down
8 changes: 3 additions & 5 deletions bbot_server/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,12 @@ def gather_status_codes(cls):
gather_status_codes(BBOTServerError)


from fastapi import Request
from fastapi.responses import ORJSONResponse


def handle_bbot_server_error(request: Request, exc: Exception):
def handle_bbot_server_error(request, exc: Exception):
"""
Catch BBOTServerErrors and transform them into appropriate FastAPI responses
"""
from fastapi.responses import ORJSONResponse

status_code = exc.http_status_code
error_message = str(exc)
message = error_message if error_message else exc.default_message
Expand Down
10 changes: 9 additions & 1 deletion bbot_server/interfaces/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,10 @@ async def _http_request(self, _url, _route, *args, **kwargs):
Uses the API route to figure out the format etc.
"""
method, _url, kwargs = self._prepare_api_request(_url, _route, *args, **kwargs)
body = self._prepare_http_body(method, kwargs)
try:
body = self._prepare_http_body(method, kwargs)
except ValueError as e:
raise BBOTServerError(f"Error preparing HTTP body for {method} request -> {_url}: {e}") from e

async def warn_if_slow():
await asyncio.sleep(5)
Expand Down Expand Up @@ -105,6 +108,11 @@ async def _http_stream(self, _url, _route, *args, **kwargs):
Similar to _request(), but instead of returning a single object, returns an async generator that yields objects
"""
method, _url, kwargs = self._prepare_api_request(_url, _route, *args, **kwargs)
try:
body = self._prepare_http_body(method, kwargs)
except ValueError as e:
raise BBOTServerError(f"Error preparing HTTP body for {method} request -> {_url}: {e}") from e

body = self._prepare_http_body(method, kwargs)
buffer = b""
MAX_BUFFER_SIZE = 10 * 1024 * 1024 # 10 MB max buffer size
Expand Down
11 changes: 10 additions & 1 deletion bbot_server/message_queue/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ class RedisMessageQueue(BaseMessageQueue):
- bbot:work:{subject}: for one-time messages, e.g. tasks
docker run --rm -p 127.0.0.1:6379:6379 redis
To monitor Redis:
Prod:
docker exec -it <container_name> redis-cli -n 15 MONITOR
Test:
docker exec -it <container_name> redis-cli MONITOR
"""

def __init__(self, *args, **kwargs):
Expand Down Expand Up @@ -100,7 +106,9 @@ async def subscribe(self, subject: str, callback, durable: str = None, historic=

# Create a task to process messages
async def message_handler():
self.log.info(f"Subscribed to {stream_key}")
self.log.info(
f"Subscribed to {stream_key} with consumer {consumer_name} and group {group_name} (durable={durable})"
)
while True:
try:
# Read new messages from the stream
Expand Down Expand Up @@ -128,6 +136,7 @@ async def message_handler():
self.log.debug("Sleeping for .1 seconds")
await asyncio.sleep(0.1)
except asyncio.CancelledError:
self.log.info(f"Message handler cancelled cancelled for {stream_key}")
break
except Exception as e:
self.log.error(f"Error in message handler: {e}")
Expand Down
4 changes: 2 additions & 2 deletions bbot_server/models/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@
from hashlib import sha1

from bbot.models.pydantic import BBOTBaseModel
from bbot_server.utils.misc import _sanitize_mongo_operators
from bbot_server.utils.misc import _sanitize_mongo_query

log = logging.getLogger("bbot_server.models")


class BaseBBOTServerModel(BBOTBaseModel):
def model_dump(self, *args, mode="json", exclude_none=True, **kwargs):
return _sanitize_mongo_operators(super().model_dump(*args, mode=mode, exclude_none=exclude_none, **kwargs))
return _sanitize_mongo_query(super().model_dump(*args, mode=mode, exclude_none=exclude_none, **kwargs))

def sha1(self, data: str) -> str:
return sha1(data.encode()).hexdigest()
35 changes: 33 additions & 2 deletions bbot_server/modules/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

log = logging.getLogger(__name__)


modules_dir = Path(__file__).parent

# models that add custom fields to the main asset model
Expand Down Expand Up @@ -79,7 +78,6 @@ def check_for_asset_field_models(source_code, filename):
# search recursively for every python file in the modules dir
python_files = list(modules_dir.rglob("*.py"))


### PRELOADING ###

# preload asset fields before loading any other modules
Expand Down Expand Up @@ -145,6 +143,39 @@ def load_python_file(file, namespace, module_dict, base_class_name, module_key_a


# load applets first
"""
TODO: for some reason this is taking a long time (almost a full second)
python files loaded in 0.001 seconds
asset fields classes loaded in 0.023 seconds
asset model merged in 0.122 seconds
modules loaded in 0.122 seconds
applets loaded in 0.896 seconds
modules/__init__.py took 0.901 seconds
technologies_api.py loaded in 0.649 seconds
findings_api.py loaded in 0.006 seconds
scans_api.py loaded in 0.112 seconds
open_ports_api.py loaded in 0.001 seconds
activity_api.py loaded in 0.000 seconds
assets_api.py loaded in 0.001 seconds
agents_api.py loaded in 0.005 seconds
presets_api.py loaded in 0.000 seconds
stats_api.py loaded in 0.002 seconds
targets_api.py loaded in 0.001 seconds
events_api.py loaded in 0.000 seconds
emails_api.py loaded in 0.001 seconds
cloud_api.py loaded in 0.001 seconds
dns_links_api.py loaded in 0.001 seconds
Notably, "from fastapi import Body, Query" takes .2 seconds.
But the worst culprit is "from bbot_server.applets.base import BaseApplet, api_endpoint, Annotated" which takes .45 seconds.
Following the chain, "from bbot_server.applets._routing import ROUTE_TYPES" takes .3 seconds
Continuing down, "from bbot_server.api.mcp import MCP_ENDPOINTS" takes .23 seconds
Our main culprit then for slow import time is fastapi_mcp.
"""
for file in python_files:
if file.stem.endswith("_api"):
module_name = file.stem.rsplit("_applet", 1)[0]
Expand Down
50 changes: 49 additions & 1 deletion bbot_server/modules/activity/activity_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ async def handle_activity(self, activity: Activity, asset: Asset = None):
@api_endpoint(
"/list", methods=["GET"], type="http_stream", response_model=Activity, summary="Stream all activities"
)
async def get_activities(self, host: str = None, type: str = None):
async def list_activities(self, host: str = None, type: str = None):
query = {}
if host:
query["host"] = host
Expand All @@ -27,6 +27,54 @@ async def get_activities(self, host: str = None, type: str = None):
async for activity in self.collection.find(query, sort=[("timestamp", 1), ("created", 1)]):
yield self.model(**activity)

@api_endpoint("/query", methods=["POST"], type="http_stream", response_model=dict, summary="List activities")
async def query_activities(
self,
query: dict = None,
search: str = None,
host: str = None,
domain: str = None,
type: str = None,
target_id: str = None,
archived: bool = False,
active: bool = True,
ignored: bool = False,
fields: list[str] = None,
sort: list[str | tuple[str, int]] = None,
aggregate: list[dict] = None,
):
"""
Advanced querying of activities. Choose your own filters and fields.

Args:
query: Additional query parameters (mongo)
search: Search using mongo's text index
host: Filter activities by host (exact match only)
domain: Filter activities by domain (subdomains allowed)
type: Filter activities by type
target_id: Filter activities by target ID
archived: Optionally return archived activities
active: Whether to include active (non-archived) activities
fields: List of fields to return
sort: Fields and direction to sort by. Accepts either a list of field names or a list of tuples (field, direction).
E.g. sort=["-last_seen", "technology"] or sort=[("last_seen", -1), ("technology", 1)]
aggregate: Optional custom MongoDB aggregation pipeline
"""
async for activity in self.mongo_iter(
query=query,
search=search,
host=host,
domain=domain,
type=type,
target_id=target_id,
archived=archived,
active=active,
fields=fields,
sort=sort,
aggregate=aggregate,
):
yield activity

@api_endpoint("/tail", type="websocket_stream_outgoing", response_model=Activity)
async def tail_activities(self, n: int = 0):
agen = self.message_queue.tail_activities(n=n)
Expand Down
2 changes: 1 addition & 1 deletion bbot_server/modules/activity/activity_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def list(
] = None,
json: common.json = False,
):
activities = self.bbot_server.get_activities(host=host, type=type)
activities = self.bbot_server.list_activities(host=host, type=type)

if json:
for activity in activities:
Expand Down
Loading