-
Notifications
You must be signed in to change notification settings - Fork 3k
Implement Customer Facing Statsbeat #41669
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,7 +7,8 @@ | |
import os | ||
import random | ||
import subprocess | ||
|
||
import time | ||
from azure.monitor.opentelemetry.exporter.statsbeat._customer_statsbeat_types import DropCode, TelemetryType | ||
from azure.monitor.opentelemetry.exporter._utils import PeriodicTask | ||
|
||
logger = logging.getLogger(__name__) | ||
|
@@ -110,6 +111,10 @@ def __init__( | |
self._maintenance_task.start() | ||
else: | ||
logger.error("Could not set secure permissions on storage folder, local storage is disabled.") | ||
self._customer_statsbeat_metrics = None | ||
|
||
def set_customer_statsbeat_metrics(self, customer_statsbeat_metrics): | ||
self._customer_statsbeat_metrics = customer_statsbeat_metrics | ||
|
||
def close(self): | ||
if self._enabled: | ||
|
@@ -182,9 +187,36 @@ def get(self): | |
return None | ||
|
||
def put(self, data, lease_period=None): | ||
if not self._enabled: | ||
# Create path if it doesn't exist | ||
metrics = self._customer_statsbeat_metrics | ||
try: | ||
if not self._enabled: | ||
return None | ||
except Exception as ex: | ||
# Track read-only filesystem errors | ||
if metrics: | ||
exception_message = str(ex) | ||
for item in data: | ||
telemetry_type = self._get_telemetry_type(item) | ||
metrics.count_dropped_items( | ||
1, | ||
DropCode.CLIENT_READONLY, | ||
telemetry_type, | ||
exception_message[:100] # Truncate to avoid high cardinality | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No exception message is needed for |
||
) | ||
logger.warning("Error creating storage directory: %s", ex) | ||
return None | ||
# Check storage capacity | ||
if not self._check_storage_size(): | ||
# If storage is full and metrics are available, track dropped items | ||
if metrics: | ||
for item in data: | ||
telemetry_type = self._get_telemetry_type(item) | ||
metrics.count_dropped_items( | ||
1, | ||
DropCode.CLIENT_PERSISTENCE_CAPACITY, | ||
telemetry_type | ||
) | ||
return None | ||
blob = LocalFileBlob( | ||
os.path.join( | ||
|
@@ -197,7 +229,20 @@ def put(self, data, lease_period=None): | |
) | ||
if lease_period is None: | ||
lease_period = self._lease_period | ||
return blob.put(data, lease_period=lease_period) | ||
result = blob.put(data, lease_period=lease_period) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you add a comment here saying |
||
|
||
# Track storage failures if put failed | ||
if result is None and metrics: | ||
for item in data: | ||
telemetry_type = self._get_telemetry_type(item) | ||
metrics.count_dropped_items( | ||
1, | ||
DropCode.CLIENT_EXCEPTION, | ||
telemetry_type, | ||
"Failed to write to storage" | ||
) | ||
|
||
return result | ||
|
||
def _check_and_set_folder_permissions(self): | ||
""" | ||
|
@@ -275,3 +320,81 @@ def _get_current_user(self): | |
else: | ||
user = os.getlogin() | ||
return user | ||
|
||
def get_items(self, batch_size=50, customer_statsbeat_metrics=None): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Where is this function being used? I'm not sure what it is doing. |
||
# Use the metrics object passed in or the one set on this instance | ||
metrics = customer_statsbeat_metrics or self._customer_statsbeat_metrics | ||
|
||
try: | ||
cursor = self.gets() | ||
items = [] | ||
for _ in range(batch_size): | ||
try: | ||
item = next(cursor) | ||
items.append(item) | ||
except StopIteration: | ||
break | ||
|
||
# Filter out expired items | ||
now = time.time() | ||
valid_items = [] | ||
expired_items = [] | ||
|
||
for item in items: | ||
timestamp = item.get("time") | ||
if not timestamp or (now - self._parse_timestamp(timestamp)) < self._retention_period: | ||
valid_items.append(item) | ||
else: | ||
expired_items.append(item) | ||
|
||
# Track expired items | ||
if metrics and expired_items: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Checking expiration is not needed anymore since https://github.com/aep-health-and-standards/Telemetry-Collection-Spec/pull/559 |
||
for item in expired_items: | ||
telemetry_type = self._get_telemetry_type(item) | ||
metrics.count_dropped_items( | ||
1, | ||
DropCode.CLIENT_EXPIRED_DATA, | ||
telemetry_type, | ||
) | ||
|
||
return valid_items | ||
except Exception as ex: | ||
# Existing exception handling | ||
logger.warning("Error getting items from local storage: %s", ex) | ||
return [] | ||
|
||
def _get_telemetry_type(self, item): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This can probably be put in a common place like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please add |
||
try: | ||
# Map from Azure Monitor envelope types to TelemetryType enum | ||
type_map = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You should have the type map defined only once in |
||
"Microsoft.ApplicationInsights.Event": TelemetryType.CUSTOM_EVENT, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Constants like |
||
"Microsoft.ApplicationInsights.Metric": TelemetryType.CUSTOM_METRIC, | ||
"Microsoft.ApplicationInsights.RemoteDependency": TelemetryType.DEPENDENCY, | ||
"Microsoft.ApplicationInsights.Exception": TelemetryType.EXCEPTION, | ||
"Microsoft.ApplicationInsights.PageView": TelemetryType.PAGE_VIEW, | ||
"Microsoft.ApplicationInsights.Message": TelemetryType.TRACE, | ||
"Microsoft.ApplicationInsights.Request": TelemetryType.REQUEST, | ||
"Microsoft.ApplicationInsights.PerformanceCounter": TelemetryType.PERFORMANCE_COUNTER, | ||
"Microsoft.ApplicationInsights.Availability": TelemetryType.AVAILABILITY, | ||
} | ||
|
||
base_type = item.get("data", {}).get("baseType") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm surprised TelemetryItem.get() works. Does it actually store the fields as a |
||
return type_map.get(base_type, TelemetryType.UNKNOWN) | ||
except (AttributeError, KeyError): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We shouldn't be try: catching attribute errors like this an instead use the proper getters and field checking in the actual code. |
||
return TelemetryType.UNKNOWN | ||
|
||
def _parse_timestamp(self, timestamp_str): | ||
"""Parse timestamp string to Unix timestamp. | ||
|
||
Args: | ||
timestamp_str: ISO format timestamp string | ||
|
||
Returns: | ||
Unix timestamp (seconds since epoch) | ||
""" | ||
try: | ||
dt = datetime.datetime.fromisoformat(timestamp_str.replace('Z', '+00:00')) | ||
return dt.timestamp() | ||
except (ValueError, AttributeError): | ||
# If parsing fails, return current time to avoid filtering out the item | ||
return time.time() |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -55,6 +55,17 @@ | |
) | ||
from azure.monitor.opentelemetry.exporter.statsbeat._utils import _update_requests_map | ||
|
||
|
||
# Import moved to function level to avoid circular imports | ||
def get_customer_statsbeat_metrics(): | ||
# Get the CustomerStatsbeatMetrics class | ||
from azure.monitor.opentelemetry.exporter.statsbeat._customer_statsbeat import CustomerStatsbeatMetrics | ||
return CustomerStatsbeatMetrics | ||
|
||
from azure.monitor.opentelemetry.exporter.statsbeat._customer_statsbeat_types import ( | ||
TelemetryType, DropCode, RetryCode | ||
) | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
_AZURE_TEMPDIR_PREFIX = "Microsoft/AzureMonitor" | ||
|
@@ -162,6 +173,24 @@ def __init__(self, **kwargs: Any) -> None: | |
|
||
collect_statsbeat_metrics(self) | ||
|
||
# Initialize customer statsbeat if enabled | ||
self._customer_statsbeat_metrics = None | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Customers can define multiple exporters possibly with different instrumentation keys. We should probably support multiple instrumentation keys in the future but for now we can simply use the first one that we see that is instantiated. With that being said, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Any reason why we set |
||
if self._should_collect_customer_statsbeat(): | ||
try: | ||
statsbeat_options = type('StatsbeatOptions', (), { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why use this kind of syntax to introduce |
||
'instrumentation_key': self._instrumentation_key, | ||
'endpoint_url': self._endpoint, | ||
'network_collection_interval': kwargs.get('statsbeat_interval', 900000) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Statsbeat interval isn't configurable at least for now and should always be 15m. |
||
}) | ||
# Use the get_customer_statsbeat_metrics function to avoid circular imports | ||
CustomerStatsbeatMetricsClass = get_customer_statsbeat_metrics() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No need for a new function or |
||
self._customer_statsbeat_metrics = CustomerStatsbeatMetricsClass(statsbeat_options) | ||
# Connect storage with customer statsbeat if storage exists | ||
if self.storage: | ||
self.storage._customer_statsbeat_metrics = self._customer_statsbeat_metrics | ||
except Exception as ex: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think you should abstract out all the initialization logic into customer statsbeat module so that |
||
logger.warning("Failed to initialize customer statsbeat: %s", ex) | ||
|
||
def _transmit_from_storage(self) -> None: | ||
if not self.storage: | ||
return | ||
|
@@ -207,19 +236,7 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult: | |
start_time = time.time() | ||
try: | ||
track_response = self.client.track(envelopes) | ||
if not track_response.errors: # 200 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why refactor this? |
||
self._consecutive_redirects = 0 | ||
if not self._is_stats_exporter(): | ||
logger.info( | ||
"Transmission succeeded: Item received: %s. Items accepted: %s", | ||
track_response.items_received, | ||
track_response.items_accepted, | ||
) | ||
if self._should_collect_stats(): | ||
_update_requests_map(_REQ_SUCCESS_NAME[1], 1) | ||
reach_ingestion = True | ||
result = ExportResult.SUCCESS | ||
else: # 206 | ||
if track_response.errors: # 206 | ||
reach_ingestion = True | ||
resend_envelopes = [] | ||
for error in track_response.errors: | ||
|
@@ -233,12 +250,62 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult: | |
error.message, | ||
envelopes[error.index] if error.index is not None else "", | ||
) | ||
# Track dropped items in customer statsbeat | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should be within the |
||
if self._customer_statsbeat_metrics and error.index is not None: | ||
try: | ||
telemetry_type = self._get_telemetry_type(envelopes[error.index]) | ||
self._customer_statsbeat_metrics.count_dropped_items( | ||
1, | ||
DropCode.NON_RETRYABLE_STATUS_CODE, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The drop code is not literally the string |
||
telemetry_type | ||
) | ||
except Exception as track_ex: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is there a genertic exception try catch here? Simply recording statsbeat should never fail. |
||
logger.warning("Failed to track dropped items in customer statsbeat: %s", track_ex) | ||
|
||
if self.storage and resend_envelopes: | ||
# Track retried items in customer statsbeat | ||
if self._customer_statsbeat_metrics and resend_envelopes: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No need to check |
||
try: | ||
for envelope in resend_envelopes: | ||
telemetry_type = self._get_telemetry_type(envelope) | ||
self._customer_statsbeat_metrics.count_retry_items( | ||
1, | ||
RetryCode.RETRYABLE_STATUS_CODE, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Similar to the comment about, the retry code is not the string but the actual status code. |
||
telemetry_type=telemetry_type | ||
) | ||
except Exception as track_ex: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No need for try catch |
||
logger.warning("Failed to track retried items in customer statsbeat: %s", track_ex) | ||
|
||
envelopes_to_store = [x.as_dict() for x in resend_envelopes] | ||
self.storage.put(envelopes_to_store, 0) | ||
self.storage.put(envelopes_to_store, customer_statsbeat_metrics=self._customer_statsbeat_metrics, lease_period=0) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe this would fail as |
||
self._consecutive_redirects = 0 | ||
# Mark as not retryable because we already write to storage here | ||
result = ExportResult.FAILED_NOT_RETRYABLE | ||
else: # 200 | ||
self._consecutive_redirects = 0 | ||
if not self._is_stats_exporter(): | ||
logger.info( | ||
"Transmission succeeded: Item received: %s. Items accepted: %s", | ||
track_response.items_received, | ||
track_response.items_accepted, | ||
) | ||
if self._should_collect_stats(): | ||
_update_requests_map(_REQ_SUCCESS_NAME[1], 1) | ||
|
||
# Track successful items in customer statsbeat | ||
if self._customer_statsbeat_metrics: | ||
try: | ||
for envelope in envelopes: | ||
telemetry_type = self._get_telemetry_type(envelope) | ||
self._customer_statsbeat_metrics.count_successful_items( | ||
1, | ||
telemetry_type | ||
) | ||
except Exception as track_ex: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same here. |
||
logger.warning("Failed to track successful items in customer statsbeat: %s", track_ex) | ||
|
||
reach_ingestion = True | ||
result = ExportResult.SUCCESS | ||
except HttpResponseError as response_error: | ||
# HttpResponseError is raised when a response is received | ||
if _reached_ingestion_code(response_error.status_code): | ||
|
@@ -361,6 +428,52 @@ def _is_statsbeat_initializing_state(self): | |
|
||
def _is_stats_exporter(self): | ||
return self.__class__.__name__ == "_StatsBeatExporter" | ||
|
||
def _should_collect_customer_statsbeat(self): | ||
"""Check if customer statsbeat collection is enabled. | ||
|
||
Returns: | ||
bool: True if customer statsbeat collection is enabled, False otherwise | ||
""" | ||
# Check environment variable | ||
env_value = os.environ.get("APPLICATIONINSIGHTS_STATSBEAT_ENABLED_PREVIEW", "") | ||
is_enabled = env_value.lower() in ("true", "1", "yes") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's only accept "true" |
||
# Don't collect customer statsbeat for stats exporter itself or for instrumentation collection | ||
return ( | ||
is_enabled | ||
and not self._is_stats_exporter() | ||
and not self._instrumentation_collection | ||
) | ||
|
||
def _get_telemetry_type(self, envelope: TelemetryItem) -> TelemetryType: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same comments apply from the above implementation of |
||
"""Extract telemetry type from envelope. | ||
|
||
Args: | ||
envelope: The telemetry envelope | ||
|
||
Returns: | ||
TelemetryType enum value or TelemetryType.UNKNOWN if type couldn't be determined | ||
""" | ||
try: | ||
if hasattr(envelope, "data") and envelope.data is not None: | ||
base_type = getattr(envelope.data, "base_type", None) | ||
if base_type: | ||
# Map from Azure Monitor envelope types to TelemetryType enum | ||
type_map = { | ||
"EventData": TelemetryType.CUSTOM_EVENT, | ||
"MetricData": TelemetryType.CUSTOM_METRIC, | ||
"RemoteDependencyData": TelemetryType.DEPENDENCY, | ||
"ExceptionData": TelemetryType.EXCEPTION, | ||
"PageViewData": TelemetryType.PAGE_VIEW, | ||
"MessageData": TelemetryType.TRACE, | ||
"RequestData": TelemetryType.REQUEST, | ||
"PerformanceCounterData": TelemetryType.PERFORMANCE_COUNTER, | ||
"AvailabilityData": TelemetryType.AVAILABILITY | ||
} | ||
return type_map.get(base_type, TelemetryType.UNKNOWN) | ||
except (AttributeError, KeyError): | ||
pass | ||
return TelemetryType.UNKNOWN | ||
|
||
|
||
def _is_invalid_code(response_code: Optional[int]) -> bool: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
# Copyright (c) Microsoft Corporation. All rights reserved. | ||
# Licensed under the MIT License. | ||
|
||
"""Statsbeat package for Azure Monitor OpenTelemetry Exporter.""" | ||
|
||
from typing import Any, List | ||
|
||
from azure.monitor.opentelemetry.exporter.statsbeat._customer_statsbeat_types import ( | ||
TelemetryType, DropCode, RetryCode, CustomerStatsbeat, CustomStatsbeatCounter | ||
) | ||
|
||
# DO NOT import CustomerStatsbeatMetrics here to avoid circular imports | ||
# The class will be available via the lazy import below | ||
|
||
# Define CustomerStatsbeatMetrics for type checking only | ||
CustomerStatsbeatMetrics: Any = None | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure if we need all this logic? We can just to lazy importing just like how we are doing with regular statsbeat. |
||
|
||
__all__: List[str] = [ | ||
'TelemetryType', | ||
'DropCode', | ||
'RetryCode', | ||
'CustomerStatsbeat', | ||
'CustomStatsbeatCounter', | ||
'CustomerStatsbeatMetrics', | ||
] | ||
|
||
# Avoid circular imports by using lazy import | ||
def __getattr__(name): | ||
if name == "CustomerStatsbeatMetrics": | ||
from azure.monitor.opentelemetry.exporter.statsbeat._customer_statsbeat import CustomerStatsbeatMetrics | ||
return CustomerStatsbeatMetrics | ||
raise AttributeError(f"module {__name__!r} has no attribute {name!r}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't believe you need a try: catch here. Can simply just record the metric if
put
is called andself._enabled
is false.