-
Notifications
You must be signed in to change notification settings - Fork 3k
Implement Customer Facing Statsbeat #41669
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Implement Customer Facing Statsbeat #41669
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Implements preview support for customer-facing Statsbeat by integrating metric tracking into the exporter and local storage, and adding unit/integration tests.
- Hooks
CustomerStatsbeatMetrics
into the base exporter to count successful, dropped, and retried items. - Enhances
LocalFileStorage.put
andget_items
to emit Statsbeat metrics on storage errors, capacity, and expiry. - Adds comprehensive tests for metrics collection and backend integration, plus updates constants and changelog.
Reviewed Changes
Copilot reviewed 9 out of 10 changed files in this pull request and generated no comments.
Show a summary per file
File | Description |
---|---|
azure/monitor/opentelemetry/exporter/export/_base.py | Integrated customer statsbeat initialization and tracking in the transmission flow. |
azure/monitor/opentelemetry/exporter/_storage.py | Extended storage to set and use customer_statsbeat_metrics for recording storage-related drops. |
tests/statsbeat/test_customer_statsbeat_metrics.py | New unit tests covering success, drop, and retry counting methods. |
tests/statsbeat/test_customer_statsbeat_integration.py | New integration tests verifying end-to-end metric export and failure handling. |
tests/statsbeat/test_customer_statsbeat.py | Simplified initialization tests and disabled-mode behavior. |
azure/monitor/opentelemetry/exporter/statsbeat/_customer_statsbeat_types.py | Defined new types and counters for customer Statsbeat. |
azure/monitor/opentelemetry/exporter/statsbeat/init.py | Lazy-import setup for CustomerStatsbeatMetrics . |
azure/monitor/opentelemetry/exporter/_constants.py | Added environment variable constant for enabling preview. |
CHANGELOG.md | Documented the preview feature under 1.0.0b36. |
API Change CheckAPIView identified API level changes in this PR and created the following API reviews |
...opentelemetry-exporter/azure/monitor/opentelemetry/exporter/statsbeat/_customer_statsbeat.py
Show resolved
Hide resolved
if not self._enabled: | ||
# Create path if it doesn't exist | ||
metrics = self._customer_statsbeat_metrics | ||
try: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't believe you need a try: catch here. Can simply just record the metric if put
is called and self._enabled
is false.
1, | ||
DropCode.CLIENT_READONLY, | ||
telemetry_type, | ||
exception_message[:100] # Truncate to avoid high cardinality |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No exception message is needed for DropCode.CLIENT_READONLY
.
@@ -197,7 +229,20 @@ def put(self, data, lease_period=None): | |||
) | |||
if lease_period is None: | |||
lease_period = self._lease_period | |||
return blob.put(data, lease_period=lease_period) | |||
result = blob.put(data, lease_period=lease_period) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a comment here saying blob.put
should return a None
if an exception was thrown within it.
def __init__(self): | ||
self.total_item_success_count: List[Dict[str, Any]] = [] | ||
self.total_item_drop_count: List[Dict[str, Any]] = [] | ||
self.total_item_retry_count: List[Dict[str, Any]] = [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Add new line at end of file.
logger.warning("Error getting items from local storage: %s", ex) | ||
return [] | ||
|
||
def _get_telemetry_type(self, item): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can probably be put in a common place like utils
instead of here.
# Language constant for customer statsbeat | ||
STATSBEAT_LANGUAGE = "python" | ||
class TelemetryType(str, Enum): | ||
AVAILABILITY = "AVAILABILITY" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These types are not just specific to statsbeat so you can probably put them under _constants.py
.
try: | ||
# Map from Azure Monitor envelope types to TelemetryType enum | ||
type_map = { | ||
"Microsoft.ApplicationInsights.Event": TelemetryType.CUSTOM_EVENT, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Constants like "Microsoft.ApplicationInsights.Metric"
are already defined in _constants.py
that you can use. I recommend adding to that and then using those symbols here.
def _get_telemetry_type(self, item): | ||
try: | ||
# Map from Azure Monitor envelope types to TelemetryType enum | ||
type_map = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should have the type map defined only once in _constants
or _utils
instead of defining this everytime this function is called.
logger.warning("Error getting items from local storage: %s", ex) | ||
return [] | ||
|
||
def _get_telemetry_type(self, item): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add typing
to the function signature so we know what type of parameter item
to expect (should be TelemetryItem
).
"Microsoft.ApplicationInsights.Availability": TelemetryType.AVAILABILITY, | ||
} | ||
|
||
base_type = item.get("data", {}).get("baseType") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm surprised TelemetryItem.get() works. Does it actually store the fields as a dict
?
|
||
base_type = item.get("data", {}).get("baseType") | ||
return type_map.get(base_type, TelemetryType.UNKNOWN) | ||
except (AttributeError, KeyError): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We shouldn't be try: catching attribute errors like this an instead use the proper getters and field checking in the actual code.
@@ -275,3 +320,81 @@ def _get_current_user(self): | |||
else: | |||
user = os.getlogin() | |||
return user | |||
|
|||
def get_items(self, batch_size=50, customer_statsbeat_metrics=None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where is this function being used? I'm not sure what it is doing.
expired_items.append(item) | ||
|
||
# Track expired items | ||
if metrics and expired_items: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Checking expiration is not needed anymore since https://github.com/aep-health-and-standards/Telemetry-Collection-Spec/pull/559
""" | ||
# Check environment variable | ||
env_value = os.environ.get("APPLICATIONINSIGHTS_STATSBEAT_ENABLED_PREVIEW", "") | ||
is_enabled = env_value.lower() in ("true", "1", "yes") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's only accept "true"
and not self._instrumentation_collection | ||
) | ||
|
||
def _get_telemetry_type(self, envelope: TelemetryItem) -> TelemetryType: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comments apply from the above implementation of get_telemetry_type
in storage.py
. Also, if these functions do the same thing, you should abstract it out to util
so you don't have to define it multiple times.
@@ -162,6 +173,24 @@ def __init__(self, **kwargs: Any) -> None: | |||
|
|||
collect_statsbeat_metrics(self) | |||
|
|||
# Initialize customer statsbeat if enabled | |||
self._customer_statsbeat_metrics = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Customers can define multiple exporters possibly with different instrumentation keys. We should probably support multiple instrumentation keys in the future but for now we can simply use the first one that we see that is instantiated. With that being said, CustomerStatsbeatMetricsClass
should then be a singleton so we don't instantiate it multiple times across exporters (there should be only one global instance of it that is keep track of stats metrics).
1, | ||
telemetry_type | ||
) | ||
except Exception as track_ex: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here.
envelopes_to_store = [x.as_dict() for x in resend_envelopes] | ||
self.storage.put(envelopes_to_store, 0) | ||
self.storage.put(envelopes_to_store, customer_statsbeat_metrics=self._customer_statsbeat_metrics, lease_period=0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this would fail as storage.put
is not expecting to accept param customer_statsbeat_metrics
# The class will be available via the lazy import below | ||
|
||
# Define CustomerStatsbeatMetrics for type checking only | ||
CustomerStatsbeatMetrics: Any = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if we need all this logic? We can just to lazy importing just like how we are doing with regular statsbeat.
REQUEST = "REQUEST" | ||
TRACE = "TRACE" | ||
UNKNOWN = "UNKNOWN" | ||
class DropCode(str, Enum): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Please add new lines between classes.
CLIENT_READONLY = "CLIENT_READONLY" | ||
CLIENT_STALE_DATA = "CLIENT_STALE_DATA" | ||
CLIENT_PERSISTENCE_CAPACITY = "CLIENT_PERSISTENCE_CAPACITY" | ||
NON_RETRYABLE_STATUS_CODE = "NON_RETRYABLE_STATUS_CODE" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This and RETRYABLE_STATUS_CODE
are not actual constants.
UNKNOWN = "UNKNOWN" | ||
class RetryCode(str, Enum): | ||
CLIENT_EXCEPTION = "CLIENT_EXCEPTION" | ||
CLIENT_STORAGE_DISABLED = "CLIENT_STORAGE_DISABLED" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
UNKNOWN = "UNKNOWN" | ||
class DropCode(str, Enum): | ||
CLIENT_EXCEPTION = "CLIENT_EXCEPTION" | ||
CLIENT_EXPIRED_DATA = "CLIENT_EXPIRED_DATA" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CLIENT_TIMEOUT = "CLIENT_TIMEOUT" | ||
RETRYABLE_STATUS_CODE = "RETRYABLE_STATUS_CODE" | ||
UNKNOWN = "UNKNOWN" | ||
class CustomStatsbeatCounter(str, Enum): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
class CustomStatsbeatCounter(str, Enum): | |
class CustomerStatsbeatMetricName(str, Enum): |
from azure.monitor.opentelemetry.exporter import VERSION | ||
except ImportError: | ||
# Fallback if import fails | ||
VERSION = "unknown" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if this is needed.
|
||
# Import get_attach_type from _utils if available | ||
try: | ||
from azure.monitor.opentelemetry.exporter._utils.metric_utils import get_attach_type |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not needed.
- Retried telemetry items (with reasons) | ||
""" | ||
|
||
def _is_customer_statsbeat_enabled(self) -> bool: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need for this check if CustomerStatsbeatMetrics
is never instantiated when we check in _base
.
"connection_string": f"InstrumentationKey={options.instrumentation_key};" | ||
f"IngestionEndpoint={options.endpoint_url}" | ||
} | ||
self._customer_statsbeat_exporter = AzureMonitorStatsbeatExporter(**exporter_config) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We cannot use AzureMonitorStatsbeatExporter
since it has custom name mappings for regular statsbeat.
) | ||
|
||
# Initialize base class with required parameters | ||
super().__init__( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should not be reusing this base class as it is specific to regular statsbeat.
) | ||
|
||
self._customer_statsbeat_meter = self._customer_statsbeat_meter_provider.get_meter( | ||
"Azure Monitor Customer Statsbeat" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not need for custom name, just use __name__
self.version = version | ||
self.compute_type = compute_type | ||
|
||
class CustomerStatsbeat: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is doesn't really belong in a types
file. You should define this where this is going to be used instead.
return | ||
|
||
try: | ||
counter = getattr(self, "_customer_statsbeat_counter", None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need for these checks since self._customer_statsbeat_counter
is hard defined by us.
# Check if there's an existing entry for this telemetry type | ||
existing_entry = None | ||
for entry in counter.total_item_success_count: | ||
if entry["telemetry_type"] == telemetry_type: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use a Dict
instead of a List
so you can do existence checks in O(1).
counter.total_item_success_count.append( | ||
{"telemetry_type": telemetry_type, "count": count} | ||
) | ||
except (TypeError, KeyError) as exc: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These try catchs are unneeded.
|
||
# Check if there's an existing entry for this drop code and telemetry type | ||
existing_entry = None | ||
for entry in counter.total_item_drop_count: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should be using a nested Dict
instead of a List
of all possible combinations of drop_code
and telemetry_type
. A list is not efficient because we have to iterate through the entire list every time we get a new drop code or telemetry type.
drop_code: The reason/code for the drop (enum or string) | ||
telemetry_type: The type of telemetry (e.g., REQUEST, DEPENDENCY) | ||
exception_message: Optional exception message for logging | ||
drop_reason: Optional additional reason for the drop |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Drop reason should not be optional.
count: Number of dropped items | ||
drop_code: The reason/code for the drop (enum or string) | ||
telemetry_type: The type of telemetry (e.g., REQUEST, DEPENDENCY) | ||
exception_message: Optional exception message for logging |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Exception message is not for logging, it is needed for drop.reason CLIENT_EXCEPTION
or if reason is a status code to explain WHY the telemetry was dropped.
new_entry["drop_reason"] = drop_reason | ||
|
||
counter.total_item_drop_count.append(new_entry) | ||
except (TypeError, KeyError, AttributeError) as exc: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need for these try catches.
# This is acceptable in statsbeat since it should never crash the host app | ||
logger.warning("Failed to count dropped items: %s", str(exc)) | ||
|
||
def count_retry_items( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Everything above applies for this function as well.
# This is acceptable in statsbeat since it should never crash the host app | ||
logger.warning("Failed to count retry items: %s", str(exc)) | ||
|
||
def shutdown(self) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need for a shutdown method as of right now.
ITEM_SUCCESS_COUNT = "preview.item.success.count" | ||
ITEM_DROP_COUNT = "preview.item.dropped.count" | ||
ITEM_RETRY_COUNT = "preview.item.retry.count" | ||
class ItemSuccessCount(TypedDict, total=False): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where are these classes (TypedDicts) being use? I can't seem to find usage.
return [] | ||
|
||
# Create observations for each unique entry | ||
for entry in counter.total_item_success_count: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is too inefficient when using a List
. Use a Dict[telemetry_type, count]
instead.
# Create observations for each unique entry | ||
for entry in counter.total_item_success_count: | ||
if entry.get("count", 0) > 0: | ||
attributes = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can define this at the top level since this will never change.
entry["count"] = 0 | ||
|
||
return observations | ||
except (KeyError, AttributeError) as exc: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need for try except.
return [] | ||
|
||
# Create observations for each unique entry | ||
for entry in counter.total_item_drop_count: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is too inefficient when using a List. Use a Dict[telemetry_type, Dict[drop_code, Dict[drop.reason, count]]]
instead.
This PR is quite big a complex. I suggest implementing this feature in different parts to ensure easier manual testing of each part. |
Description
Implement customer facing statsbeat as a preview feature.
Packages impacted by this PR
@azure/monitor-opentelemetry-exporter
All SDK Contribution checklist:
General Guidelines and Best Practices
Testing Guidelines