Skip to content

Implement Customer Facing Statsbeat #41669

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

rads-1996
Copy link
Member

Description

Implement customer facing statsbeat as a preview feature.

Packages impacted by this PR

@azure/monitor-opentelemetry-exporter

All SDK Contribution checklist:

  • The pull request does not introduce [breaking changes]
  • CHANGELOG is updated for new features, bug fixes or other significant changes.
  • I have read the contribution guidelines.

General Guidelines and Best Practices

  • Title of the pull request is clear and informative.
  • There are a small number of commits, each of which have an informative message. This means that previously merged commits do not appear in the history of the PR. For more information on cleaning up the commits in your PR, see this page.

Testing Guidelines

  • Pull request includes test coverage for the included changes.

@Copilot Copilot AI review requested due to automatic review settings June 19, 2025 19:38
@github-actions github-actions bot added the Monitor - Exporter Monitor OpenTelemetry Exporter label Jun 19, 2025
Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

Implements preview support for customer-facing Statsbeat by integrating metric tracking into the exporter and local storage, and adding unit/integration tests.

  • Hooks CustomerStatsbeatMetrics into the base exporter to count successful, dropped, and retried items.
  • Enhances LocalFileStorage.put and get_items to emit Statsbeat metrics on storage errors, capacity, and expiry.
  • Adds comprehensive tests for metrics collection and backend integration, plus updates constants and changelog.

Reviewed Changes

Copilot reviewed 9 out of 10 changed files in this pull request and generated no comments.

Show a summary per file
File Description
azure/monitor/opentelemetry/exporter/export/_base.py Integrated customer statsbeat initialization and tracking in the transmission flow.
azure/monitor/opentelemetry/exporter/_storage.py Extended storage to set and use customer_statsbeat_metrics for recording storage-related drops.
tests/statsbeat/test_customer_statsbeat_metrics.py New unit tests covering success, drop, and retry counting methods.
tests/statsbeat/test_customer_statsbeat_integration.py New integration tests verifying end-to-end metric export and failure handling.
tests/statsbeat/test_customer_statsbeat.py Simplified initialization tests and disabled-mode behavior.
azure/monitor/opentelemetry/exporter/statsbeat/_customer_statsbeat_types.py Defined new types and counters for customer Statsbeat.
azure/monitor/opentelemetry/exporter/statsbeat/init.py Lazy-import setup for CustomerStatsbeatMetrics.
azure/monitor/opentelemetry/exporter/_constants.py Added environment variable constant for enabling preview.
CHANGELOG.md Documented the preview feature under 1.0.0b36.

Copy link

API Change Check

APIView identified API level changes in this PR and created the following API reviews

azure-monitor-opentelemetry-exporter

if not self._enabled:
# Create path if it doesn't exist
metrics = self._customer_statsbeat_metrics
try:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't believe you need a try: catch here. Can simply just record the metric if put is called and self._enabled is false.

1,
DropCode.CLIENT_READONLY,
telemetry_type,
exception_message[:100] # Truncate to avoid high cardinality
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No exception message is needed for DropCode.CLIENT_READONLY.

@@ -197,7 +229,20 @@ def put(self, data, lease_period=None):
)
if lease_period is None:
lease_period = self._lease_period
return blob.put(data, lease_period=lease_period)
result = blob.put(data, lease_period=lease_period)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment here saying blob.put should return a None if an exception was thrown within it.

def __init__(self):
self.total_item_success_count: List[Dict[str, Any]] = []
self.total_item_drop_count: List[Dict[str, Any]] = []
self.total_item_retry_count: List[Dict[str, Any]] = []
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Add new line at end of file.

logger.warning("Error getting items from local storage: %s", ex)
return []

def _get_telemetry_type(self, item):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can probably be put in a common place like utils instead of here.

# Language constant for customer statsbeat
STATSBEAT_LANGUAGE = "python"
class TelemetryType(str, Enum):
AVAILABILITY = "AVAILABILITY"
Copy link
Member

@lzchen lzchen Jun 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These types are not just specific to statsbeat so you can probably put them under _constants.py.

try:
# Map from Azure Monitor envelope types to TelemetryType enum
type_map = {
"Microsoft.ApplicationInsights.Event": TelemetryType.CUSTOM_EVENT,
Copy link
Member

@lzchen lzchen Jun 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Constants like "Microsoft.ApplicationInsights.Metric" are already defined in _constants.py that you can use. I recommend adding to that and then using those symbols here.

def _get_telemetry_type(self, item):
try:
# Map from Azure Monitor envelope types to TelemetryType enum
type_map = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should have the type map defined only once in _constants or _utils instead of defining this everytime this function is called.

logger.warning("Error getting items from local storage: %s", ex)
return []

def _get_telemetry_type(self, item):
Copy link
Member

@lzchen lzchen Jun 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add typing to the function signature so we know what type of parameter item to expect (should be TelemetryItem).

"Microsoft.ApplicationInsights.Availability": TelemetryType.AVAILABILITY,
}

base_type = item.get("data", {}).get("baseType")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm surprised TelemetryItem.get() works. Does it actually store the fields as a dict?


base_type = item.get("data", {}).get("baseType")
return type_map.get(base_type, TelemetryType.UNKNOWN)
except (AttributeError, KeyError):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't be try: catching attribute errors like this an instead use the proper getters and field checking in the actual code.

@@ -275,3 +320,81 @@ def _get_current_user(self):
else:
user = os.getlogin()
return user

def get_items(self, batch_size=50, customer_statsbeat_metrics=None):
Copy link
Member

@lzchen lzchen Jun 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is this function being used? I'm not sure what it is doing.

expired_items.append(item)

# Track expired items
if metrics and expired_items:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"""
# Check environment variable
env_value = os.environ.get("APPLICATIONINSIGHTS_STATSBEAT_ENABLED_PREVIEW", "")
is_enabled = env_value.lower() in ("true", "1", "yes")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's only accept "true"

and not self._instrumentation_collection
)

def _get_telemetry_type(self, envelope: TelemetryItem) -> TelemetryType:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comments apply from the above implementation of get_telemetry_type in storage.py. Also, if these functions do the same thing, you should abstract it out to util so you don't have to define it multiple times.

@@ -162,6 +173,24 @@ def __init__(self, **kwargs: Any) -> None:

collect_statsbeat_metrics(self)

# Initialize customer statsbeat if enabled
self._customer_statsbeat_metrics = None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Customers can define multiple exporters possibly with different instrumentation keys. We should probably support multiple instrumentation keys in the future but for now we can simply use the first one that we see that is instantiated. With that being said, CustomerStatsbeatMetricsClass should then be a singleton so we don't instantiate it multiple times across exporters (there should be only one global instance of it that is keep track of stats metrics).

1,
telemetry_type
)
except Exception as track_ex:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.

envelopes_to_store = [x.as_dict() for x in resend_envelopes]
self.storage.put(envelopes_to_store, 0)
self.storage.put(envelopes_to_store, customer_statsbeat_metrics=self._customer_statsbeat_metrics, lease_period=0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this would fail as storage.put is not expecting to accept param customer_statsbeat_metrics

# The class will be available via the lazy import below

# Define CustomerStatsbeatMetrics for type checking only
CustomerStatsbeatMetrics: Any = None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if we need all this logic? We can just to lazy importing just like how we are doing with regular statsbeat.

REQUEST = "REQUEST"
TRACE = "TRACE"
UNKNOWN = "UNKNOWN"
class DropCode(str, Enum):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Please add new lines between classes.

CLIENT_READONLY = "CLIENT_READONLY"
CLIENT_STALE_DATA = "CLIENT_STALE_DATA"
CLIENT_PERSISTENCE_CAPACITY = "CLIENT_PERSISTENCE_CAPACITY"
NON_RETRYABLE_STATUS_CODE = "NON_RETRYABLE_STATUS_CODE"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This and RETRYABLE_STATUS_CODE are not actual constants.

UNKNOWN = "UNKNOWN"
class RetryCode(str, Enum):
CLIENT_EXCEPTION = "CLIENT_EXCEPTION"
CLIENT_STORAGE_DISABLED = "CLIENT_STORAGE_DISABLED"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UNKNOWN = "UNKNOWN"
class DropCode(str, Enum):
CLIENT_EXCEPTION = "CLIENT_EXCEPTION"
CLIENT_EXPIRED_DATA = "CLIENT_EXPIRED_DATA"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CLIENT_TIMEOUT = "CLIENT_TIMEOUT"
RETRYABLE_STATUS_CODE = "RETRYABLE_STATUS_CODE"
UNKNOWN = "UNKNOWN"
class CustomStatsbeatCounter(str, Enum):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
class CustomStatsbeatCounter(str, Enum):
class CustomerStatsbeatMetricName(str, Enum):

from azure.monitor.opentelemetry.exporter import VERSION
except ImportError:
# Fallback if import fails
VERSION = "unknown"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this is needed.


# Import get_attach_type from _utils if available
try:
from azure.monitor.opentelemetry.exporter._utils.metric_utils import get_attach_type
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not needed.

- Retried telemetry items (with reasons)
"""

def _is_customer_statsbeat_enabled(self) -> bool:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for this check if CustomerStatsbeatMetrics is never instantiated when we check in _base.

"connection_string": f"InstrumentationKey={options.instrumentation_key};"
f"IngestionEndpoint={options.endpoint_url}"
}
self._customer_statsbeat_exporter = AzureMonitorStatsbeatExporter(**exporter_config)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot use AzureMonitorStatsbeatExporter since it has custom name mappings for regular statsbeat.

)

# Initialize base class with required parameters
super().__init__(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not be reusing this base class as it is specific to regular statsbeat.

)

self._customer_statsbeat_meter = self._customer_statsbeat_meter_provider.get_meter(
"Azure Monitor Customer Statsbeat"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not need for custom name, just use __name__

self.version = version
self.compute_type = compute_type

class CustomerStatsbeat:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is doesn't really belong in a types file. You should define this where this is going to be used instead.

return

try:
counter = getattr(self, "_customer_statsbeat_counter", None)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for these checks since self._customer_statsbeat_counter is hard defined by us.

# Check if there's an existing entry for this telemetry type
existing_entry = None
for entry in counter.total_item_success_count:
if entry["telemetry_type"] == telemetry_type:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use a Dict instead of a List so you can do existence checks in O(1).

counter.total_item_success_count.append(
{"telemetry_type": telemetry_type, "count": count}
)
except (TypeError, KeyError) as exc:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These try catchs are unneeded.


# Check if there's an existing entry for this drop code and telemetry type
existing_entry = None
for entry in counter.total_item_drop_count:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be using a nested Dict instead of a List of all possible combinations of drop_code and telemetry_type. A list is not efficient because we have to iterate through the entire list every time we get a new drop code or telemetry type.

drop_code: The reason/code for the drop (enum or string)
telemetry_type: The type of telemetry (e.g., REQUEST, DEPENDENCY)
exception_message: Optional exception message for logging
drop_reason: Optional additional reason for the drop
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Drop reason should not be optional.

count: Number of dropped items
drop_code: The reason/code for the drop (enum or string)
telemetry_type: The type of telemetry (e.g., REQUEST, DEPENDENCY)
exception_message: Optional exception message for logging
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Exception message is not for logging, it is needed for drop.reason CLIENT_EXCEPTION or if reason is a status code to explain WHY the telemetry was dropped.

new_entry["drop_reason"] = drop_reason

counter.total_item_drop_count.append(new_entry)
except (TypeError, KeyError, AttributeError) as exc:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for these try catches.

# This is acceptable in statsbeat since it should never crash the host app
logger.warning("Failed to count dropped items: %s", str(exc))

def count_retry_items(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Everything above applies for this function as well.

# This is acceptable in statsbeat since it should never crash the host app
logger.warning("Failed to count retry items: %s", str(exc))

def shutdown(self) -> None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for a shutdown method as of right now.

ITEM_SUCCESS_COUNT = "preview.item.success.count"
ITEM_DROP_COUNT = "preview.item.dropped.count"
ITEM_RETRY_COUNT = "preview.item.retry.count"
class ItemSuccessCount(TypedDict, total=False):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where are these classes (TypedDicts) being use? I can't seem to find usage.

return []

# Create observations for each unique entry
for entry in counter.total_item_success_count:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is too inefficient when using a List. Use a Dict[telemetry_type, count] instead.

# Create observations for each unique entry
for entry in counter.total_item_success_count:
if entry.get("count", 0) > 0:
attributes = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can define this at the top level since this will never change.

entry["count"] = 0

return observations
except (KeyError, AttributeError) as exc:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for try except.

return []

# Create observations for each unique entry
for entry in counter.total_item_drop_count:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is too inefficient when using a List. Use a Dict[telemetry_type, Dict[drop_code, Dict[drop.reason, count]]] instead.

@lzchen
Copy link
Member

lzchen commented Jun 23, 2025

This PR is quite big a complex. I suggest implementing this feature in different parts to ensure easier manual testing of each part.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Monitor - Exporter Monitor OpenTelemetry Exporter
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants