Skip to content

Implement Customer Facing Statsbeat #41669

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@
## 1.0.0b36 (2025-04-07)

### Features Added

- Added customer-facing statsbeat preview.
([#41669](https://github.com/Azure/azure-sdk-for-python/pull/41669))
- Support `syntheticSource` from `user_agent.synthetic.type` semantic convention
([#40004](https://github.com/Azure/azure-sdk-for-python/pull/40004))
- Support `server.address` attributes when converting Azure SDK messaging spans to envelopes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@
_MICROSOFT_CUSTOM_EVENT_NAME = "microsoft.custom_event.name"

# Statsbeat
## Customer Facing Statsbeat
_APPLICATIONINSIGHTS_STATSBEAT_ENABLED_PREVIEW = "APPLICATIONINSIGHTS_STATSBEAT_ENABLED_PREVIEW"

# (OpenTelemetry metric name, Statsbeat metric name)
_ATTACH_METRIC_NAME = ("attach", "Attach")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
import os
import random
import subprocess

import time
from azure.monitor.opentelemetry.exporter.statsbeat._customer_statsbeat_types import DropCode, TelemetryType
from azure.monitor.opentelemetry.exporter._utils import PeriodicTask

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -110,6 +111,10 @@ def __init__(
self._maintenance_task.start()
else:
logger.error("Could not set secure permissions on storage folder, local storage is disabled.")
self._customer_statsbeat_metrics = None

def set_customer_statsbeat_metrics(self, customer_statsbeat_metrics):
self._customer_statsbeat_metrics = customer_statsbeat_metrics

def close(self):
if self._enabled:
Expand Down Expand Up @@ -182,9 +187,36 @@ def get(self):
return None

def put(self, data, lease_period=None):
if not self._enabled:
# Create path if it doesn't exist
metrics = self._customer_statsbeat_metrics
try:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't believe you need a try: catch here. Can simply just record the metric if put is called and self._enabled is false.

if not self._enabled:
return None
except Exception as ex:
# Track read-only filesystem errors
if metrics:
exception_message = str(ex)
for item in data:
telemetry_type = self._get_telemetry_type(item)
metrics.count_dropped_items(
1,
DropCode.CLIENT_READONLY,
telemetry_type,
exception_message[:100] # Truncate to avoid high cardinality
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No exception message is needed for DropCode.CLIENT_READONLY.

)
logger.warning("Error creating storage directory: %s", ex)
return None
# Check storage capacity
if not self._check_storage_size():
# If storage is full and metrics are available, track dropped items
if metrics:
for item in data:
telemetry_type = self._get_telemetry_type(item)
metrics.count_dropped_items(
1,
DropCode.CLIENT_PERSISTENCE_CAPACITY,
telemetry_type
)
return None
blob = LocalFileBlob(
os.path.join(
Expand All @@ -197,7 +229,20 @@ def put(self, data, lease_period=None):
)
if lease_period is None:
lease_period = self._lease_period
return blob.put(data, lease_period=lease_period)
result = blob.put(data, lease_period=lease_period)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment here saying blob.put should return a None if an exception was thrown within it.


# Track storage failures if put failed
if result is None and metrics:
for item in data:
telemetry_type = self._get_telemetry_type(item)
metrics.count_dropped_items(
1,
DropCode.CLIENT_EXCEPTION,
telemetry_type,
"Failed to write to storage"
)

return result

def _check_and_set_folder_permissions(self):
"""
Expand Down Expand Up @@ -275,3 +320,81 @@ def _get_current_user(self):
else:
user = os.getlogin()
return user

def get_items(self, batch_size=50, customer_statsbeat_metrics=None):
Copy link
Member

@lzchen lzchen Jun 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is this function being used? I'm not sure what it is doing.

# Use the metrics object passed in or the one set on this instance
metrics = customer_statsbeat_metrics or self._customer_statsbeat_metrics

try:
cursor = self.gets()
items = []
for _ in range(batch_size):
try:
item = next(cursor)
items.append(item)
except StopIteration:
break

# Filter out expired items
now = time.time()
valid_items = []
expired_items = []

for item in items:
timestamp = item.get("time")
if not timestamp or (now - self._parse_timestamp(timestamp)) < self._retention_period:
valid_items.append(item)
else:
expired_items.append(item)

# Track expired items
if metrics and expired_items:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for item in expired_items:
telemetry_type = self._get_telemetry_type(item)
metrics.count_dropped_items(
1,
DropCode.CLIENT_EXPIRED_DATA,
telemetry_type,
)

return valid_items
except Exception as ex:
# Existing exception handling
logger.warning("Error getting items from local storage: %s", ex)
return []

def _get_telemetry_type(self, item):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can probably be put in a common place like utils instead of here.

Copy link
Member

@lzchen lzchen Jun 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add typing to the function signature so we know what type of parameter item to expect (should be TelemetryItem).

try:
# Map from Azure Monitor envelope types to TelemetryType enum
type_map = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should have the type map defined only once in _constants or _utils instead of defining this everytime this function is called.

"Microsoft.ApplicationInsights.Event": TelemetryType.CUSTOM_EVENT,
Copy link
Member

@lzchen lzchen Jun 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Constants like "Microsoft.ApplicationInsights.Metric" are already defined in _constants.py that you can use. I recommend adding to that and then using those symbols here.

"Microsoft.ApplicationInsights.Metric": TelemetryType.CUSTOM_METRIC,
"Microsoft.ApplicationInsights.RemoteDependency": TelemetryType.DEPENDENCY,
"Microsoft.ApplicationInsights.Exception": TelemetryType.EXCEPTION,
"Microsoft.ApplicationInsights.PageView": TelemetryType.PAGE_VIEW,
"Microsoft.ApplicationInsights.Message": TelemetryType.TRACE,
"Microsoft.ApplicationInsights.Request": TelemetryType.REQUEST,
"Microsoft.ApplicationInsights.PerformanceCounter": TelemetryType.PERFORMANCE_COUNTER,
"Microsoft.ApplicationInsights.Availability": TelemetryType.AVAILABILITY,
}

base_type = item.get("data", {}).get("baseType")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm surprised TelemetryItem.get() works. Does it actually store the fields as a dict?

return type_map.get(base_type, TelemetryType.UNKNOWN)
except (AttributeError, KeyError):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't be try: catching attribute errors like this an instead use the proper getters and field checking in the actual code.

return TelemetryType.UNKNOWN

def _parse_timestamp(self, timestamp_str):
"""Parse timestamp string to Unix timestamp.

Args:
timestamp_str: ISO format timestamp string

Returns:
Unix timestamp (seconds since epoch)
"""
try:
dt = datetime.datetime.fromisoformat(timestamp_str.replace('Z', '+00:00'))
return dt.timestamp()
except (ValueError, AttributeError):
# If parsing fails, return current time to avoid filtering out the item
return time.time()
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,17 @@
)
from azure.monitor.opentelemetry.exporter.statsbeat._utils import _update_requests_map


# Import moved to function level to avoid circular imports
def get_customer_statsbeat_metrics():
# Get the CustomerStatsbeatMetrics class
from azure.monitor.opentelemetry.exporter.statsbeat._customer_statsbeat import CustomerStatsbeatMetrics
return CustomerStatsbeatMetrics

from azure.monitor.opentelemetry.exporter.statsbeat._customer_statsbeat_types import (
TelemetryType, DropCode, RetryCode
)

logger = logging.getLogger(__name__)

_AZURE_TEMPDIR_PREFIX = "Microsoft/AzureMonitor"
Expand Down Expand Up @@ -162,6 +173,24 @@ def __init__(self, **kwargs: Any) -> None:

collect_statsbeat_metrics(self)

# Initialize customer statsbeat if enabled
self._customer_statsbeat_metrics = None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Customers can define multiple exporters possibly with different instrumentation keys. We should probably support multiple instrumentation keys in the future but for now we can simply use the first one that we see that is instantiated. With that being said, CustomerStatsbeatMetricsClass should then be a singleton so we don't instantiate it multiple times across exporters (there should be only one global instance of it that is keep track of stats metrics).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason why we set self._customer_statsbeat_metrics to None and not just directly initialize it? This would save a lot of code throughout the SDK for checking for None type.

if self._should_collect_customer_statsbeat():
try:
statsbeat_options = type('StatsbeatOptions', (), {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why use this kind of syntax to introduce StatsbeatOptions?

'instrumentation_key': self._instrumentation_key,
'endpoint_url': self._endpoint,
'network_collection_interval': kwargs.get('statsbeat_interval', 900000)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Statsbeat interval isn't configurable at least for now and should always be 15m.

})
# Use the get_customer_statsbeat_metrics function to avoid circular imports
CustomerStatsbeatMetricsClass = get_customer_statsbeat_metrics()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for a new function or CustomerStatsbeatMetricsClass type, you can simply just do the imports here.

self._customer_statsbeat_metrics = CustomerStatsbeatMetricsClass(statsbeat_options)
# Connect storage with customer statsbeat if storage exists
if self.storage:
self.storage._customer_statsbeat_metrics = self._customer_statsbeat_metrics
except Exception as ex:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should abstract out all the initialization logic into customer statsbeat module so that _base.py does not need to know anything about it (like how we did for collect_statsbeat_metrics).

logger.warning("Failed to initialize customer statsbeat: %s", ex)

def _transmit_from_storage(self) -> None:
if not self.storage:
return
Expand Down Expand Up @@ -207,19 +236,7 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult:
start_time = time.time()
try:
track_response = self.client.track(envelopes)
if not track_response.errors: # 200
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why refactor this?

self._consecutive_redirects = 0
if not self._is_stats_exporter():
logger.info(
"Transmission succeeded: Item received: %s. Items accepted: %s",
track_response.items_received,
track_response.items_accepted,
)
if self._should_collect_stats():
_update_requests_map(_REQ_SUCCESS_NAME[1], 1)
reach_ingestion = True
result = ExportResult.SUCCESS
else: # 206
if track_response.errors: # 206
reach_ingestion = True
resend_envelopes = []
for error in track_response.errors:
Expand All @@ -233,12 +250,62 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult:
error.message,
envelopes[error.index] if error.index is not None else "",
)
# Track dropped items in customer statsbeat
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be within the if not self._is_stats_exporter(): conditional.

if self._customer_statsbeat_metrics and error.index is not None:
try:
telemetry_type = self._get_telemetry_type(envelopes[error.index])
self._customer_statsbeat_metrics.count_dropped_items(
1,
DropCode.NON_RETRYABLE_STATUS_CODE,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The drop code is not literally the string NON_RETRYABLE_STATUS_CODE but the actual non retryable status code that was returned in the response (error.status_code).

telemetry_type
)
except Exception as track_ex:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is there a genertic exception try catch here? Simply recording statsbeat should never fail.

logger.warning("Failed to track dropped items in customer statsbeat: %s", track_ex)

if self.storage and resend_envelopes:
# Track retried items in customer statsbeat
if self._customer_statsbeat_metrics and resend_envelopes:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to check None for resend_envelopes again.

try:
for envelope in resend_envelopes:
telemetry_type = self._get_telemetry_type(envelope)
self._customer_statsbeat_metrics.count_retry_items(
1,
RetryCode.RETRYABLE_STATUS_CODE,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to the comment about, the retry code is not the string but the actual status code.

telemetry_type=telemetry_type
)
except Exception as track_ex:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for try catch

logger.warning("Failed to track retried items in customer statsbeat: %s", track_ex)

envelopes_to_store = [x.as_dict() for x in resend_envelopes]
self.storage.put(envelopes_to_store, 0)
self.storage.put(envelopes_to_store, customer_statsbeat_metrics=self._customer_statsbeat_metrics, lease_period=0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this would fail as storage.put is not expecting to accept param customer_statsbeat_metrics

self._consecutive_redirects = 0
# Mark as not retryable because we already write to storage here
result = ExportResult.FAILED_NOT_RETRYABLE
else: # 200
self._consecutive_redirects = 0
if not self._is_stats_exporter():
logger.info(
"Transmission succeeded: Item received: %s. Items accepted: %s",
track_response.items_received,
track_response.items_accepted,
)
if self._should_collect_stats():
_update_requests_map(_REQ_SUCCESS_NAME[1], 1)

# Track successful items in customer statsbeat
if self._customer_statsbeat_metrics:
try:
for envelope in envelopes:
telemetry_type = self._get_telemetry_type(envelope)
self._customer_statsbeat_metrics.count_successful_items(
1,
telemetry_type
)
except Exception as track_ex:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.

logger.warning("Failed to track successful items in customer statsbeat: %s", track_ex)

reach_ingestion = True
result = ExportResult.SUCCESS
except HttpResponseError as response_error:
# HttpResponseError is raised when a response is received
if _reached_ingestion_code(response_error.status_code):
Expand Down Expand Up @@ -361,6 +428,52 @@ def _is_statsbeat_initializing_state(self):

def _is_stats_exporter(self):
return self.__class__.__name__ == "_StatsBeatExporter"

def _should_collect_customer_statsbeat(self):
"""Check if customer statsbeat collection is enabled.

Returns:
bool: True if customer statsbeat collection is enabled, False otherwise
"""
# Check environment variable
env_value = os.environ.get("APPLICATIONINSIGHTS_STATSBEAT_ENABLED_PREVIEW", "")
is_enabled = env_value.lower() in ("true", "1", "yes")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's only accept "true"

# Don't collect customer statsbeat for stats exporter itself or for instrumentation collection
return (
is_enabled
and not self._is_stats_exporter()
and not self._instrumentation_collection
)

def _get_telemetry_type(self, envelope: TelemetryItem) -> TelemetryType:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comments apply from the above implementation of get_telemetry_type in storage.py. Also, if these functions do the same thing, you should abstract it out to util so you don't have to define it multiple times.

"""Extract telemetry type from envelope.

Args:
envelope: The telemetry envelope

Returns:
TelemetryType enum value or TelemetryType.UNKNOWN if type couldn't be determined
"""
try:
if hasattr(envelope, "data") and envelope.data is not None:
base_type = getattr(envelope.data, "base_type", None)
if base_type:
# Map from Azure Monitor envelope types to TelemetryType enum
type_map = {
"EventData": TelemetryType.CUSTOM_EVENT,
"MetricData": TelemetryType.CUSTOM_METRIC,
"RemoteDependencyData": TelemetryType.DEPENDENCY,
"ExceptionData": TelemetryType.EXCEPTION,
"PageViewData": TelemetryType.PAGE_VIEW,
"MessageData": TelemetryType.TRACE,
"RequestData": TelemetryType.REQUEST,
"PerformanceCounterData": TelemetryType.PERFORMANCE_COUNTER,
"AvailabilityData": TelemetryType.AVAILABILITY
}
return type_map.get(base_type, TelemetryType.UNKNOWN)
except (AttributeError, KeyError):
pass
return TelemetryType.UNKNOWN


def _is_invalid_code(response_code: Optional[int]) -> bool:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.

"""Statsbeat package for Azure Monitor OpenTelemetry Exporter."""

from typing import Any, List

from azure.monitor.opentelemetry.exporter.statsbeat._customer_statsbeat_types import (
TelemetryType, DropCode, RetryCode, CustomerStatsbeat, CustomStatsbeatCounter
)

# DO NOT import CustomerStatsbeatMetrics here to avoid circular imports
# The class will be available via the lazy import below

# Define CustomerStatsbeatMetrics for type checking only
CustomerStatsbeatMetrics: Any = None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if we need all this logic? We can just to lazy importing just like how we are doing with regular statsbeat.


__all__: List[str] = [
'TelemetryType',
'DropCode',
'RetryCode',
'CustomerStatsbeat',
'CustomStatsbeatCounter',
'CustomerStatsbeatMetrics',
]

# Avoid circular imports by using lazy import
def __getattr__(name):
if name == "CustomerStatsbeatMetrics":
from azure.monitor.opentelemetry.exporter.statsbeat._customer_statsbeat import CustomerStatsbeatMetrics
return CustomerStatsbeatMetrics
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")
Loading
Loading