Skip to content
7 changes: 5 additions & 2 deletions netbox/core/tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from users.models import Token, User
from utilities.testing import APITestCase, APIViewTestCases, TestCase
from utilities.testing.utils import disable_logging
from ..models import *


Expand Down Expand Up @@ -189,7 +190,8 @@ def test_background_task_requeue(self):
# Enqueue & run a job that will fail
job = queue.enqueue(self.dummy_job_failing)
worker = get_worker('default')
worker.work(burst=True)
with disable_logging():
worker.work(burst=True)
self.assertTrue(job.is_failed)

# Re-enqueue the failed job and check that its status has been reset
Expand Down Expand Up @@ -231,7 +233,8 @@ def test_background_task_stop(self):
self.assertEqual(job.get_status(), JobStatus.STARTED)
response = self.client.post(reverse('core-api:rqtask-stop', args=[job.id]), **self.header)
self.assertEqual(response.status_code, 200)
worker.monitor_work_horse(job, queue) # Sets the job as Failed and removes from Started
with disable_logging():
worker.monitor_work_horse(job, queue) # Sets the job as Failed and removes from Started
started_job_registry = StartedJobRegistry(queue.name, connection=queue.connection)
self.assertEqual(len(started_job_registry), 0)

Expand Down
8 changes: 5 additions & 3 deletions netbox/core/tests/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from core.models import *
from dcim.models import Site
from users.models import User
from utilities.testing import TestCase, ViewTestCases, create_tags
from utilities.testing import TestCase, ViewTestCases, create_tags, disable_logging


class DataSourceTestCase(ViewTestCases.PrimaryObjectViewTestCase):
Expand Down Expand Up @@ -271,7 +271,8 @@ def test_background_task_requeue(self):
# Enqueue & run a job that will fail
job = queue.enqueue(self.dummy_job_failing)
worker = get_worker('default')
worker.work(burst=True)
with disable_logging():
worker.work(burst=True)
self.assertTrue(job.is_failed)

# Re-enqueue the failed job and check that its status has been reset
Expand Down Expand Up @@ -317,7 +318,8 @@ def test_background_task_stop(self):
self.assertEqual(len(started_job_registry), 1)
response = self.client.get(reverse('core:background_task_stop', args=[job.id]))
self.assertEqual(response.status_code, 302)
worker.monitor_work_horse(job, queue) # Sets the job as Failed and removes from Started
with disable_logging():
worker.monitor_work_horse(job, queue) # Sets the job as Failed and removes from Started
self.assertEqual(len(started_job_registry), 0)

canceled_job_registry = FailedJobRegistry(queue.name, connection=queue.connection)
Expand Down
5 changes: 3 additions & 2 deletions netbox/dcim/tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from netbox.api.serializers import GenericObjectSerializer
from tenancy.models import Tenant
from users.models import User
from utilities.testing import APITestCase, APIViewTestCases, create_test_device
from utilities.testing import APITestCase, APIViewTestCases, create_test_device, disable_logging
from virtualization.models import Cluster, ClusterType
from wireless.choices import WirelessChannelChoices
from wireless.models import WirelessLAN
Expand Down Expand Up @@ -1858,7 +1858,8 @@ def test_bulk_delete_child_interfaces(self):

# Attempt to delete only the parent interface
url = self._get_detail_url(interface1)
self.client.delete(url, **self.header)
with disable_logging():
self.client.delete(url, **self.header)
self.assertEqual(device.interfaces.count(), 4) # Parent was not deleted

# Attempt to bulk delete parent & child together
Expand Down
9 changes: 8 additions & 1 deletion netbox/extras/tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from django.contrib.contenttypes.models import ContentType
from django.urls import reverse
from django.utils.timezone import make_aware
from django.utils.timezone import make_aware, now
from rest_framework import status

from core.choices import ManagedFileRootPathChoices
Expand Down Expand Up @@ -991,6 +991,10 @@ def setUpTestData(cls):
},
]

cls.bulk_update_data = {
'user': users[3].pk,
}


class NotificationGroupTest(APIViewTestCases.APIViewTestCase):
model = NotificationGroup
Expand Down Expand Up @@ -1072,6 +1076,9 @@ def setUpTestData(cls):
class NotificationTest(APIViewTestCases.APIViewTestCase):
model = Notification
brief_fields = ['display', 'event_type', 'id', 'object_id', 'object_type', 'read', 'url', 'user']
bulk_update_data = {
'read': now(),
}

@classmethod
def setUpTestData(cls):
Expand Down
8 changes: 6 additions & 2 deletions netbox/extras/tests/test_scripts.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
import tempfile
from datetime import date, datetime, timezone

Expand All @@ -7,6 +8,7 @@

from dcim.models import DeviceRole
from extras.scripts import *
from utilities.testing import disable_logging

CHOICES = (
('ff0000', 'Red'),
Expand Down Expand Up @@ -39,7 +41,8 @@ def test_load_yaml(self):
datafile.write(bytes(YAML_DATA, 'UTF-8'))
datafile.seek(0)

data = Script().load_yaml(datafile.name)
with disable_logging(level=logging.WARNING):
data = Script().load_yaml(datafile.name)
self.assertEqual(data, {
'Foo': 123,
'Bar': 456,
Expand All @@ -51,7 +54,8 @@ def test_load_json(self):
datafile.write(bytes(JSON_DATA, 'UTF-8'))
datafile.seek(0)

data = Script().load_json(datafile.name)
with disable_logging(level=logging.WARNING):
data = Script().load_json(datafile.name)
self.assertEqual(data, {
'Foo': 123,
'Bar': 456,
Expand Down
5 changes: 3 additions & 2 deletions netbox/ipam/tests/test_api.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
import logging

from django.urls import reverse
from netaddr import IPNetwork
Expand All @@ -9,7 +10,7 @@
from ipam.models import *
from tenancy.models import Tenant
from utilities.data import string_to_ranges
from utilities.testing import APITestCase, APIViewTestCases, create_test_device, disable_warnings
from utilities.testing import APITestCase, APIViewTestCases, create_test_device, disable_logging


class AppTest(APITestCase):
Expand Down Expand Up @@ -1026,7 +1027,7 @@ def test_delete_vlan_with_prefix(self):

self.add_permissions('ipam.delete_vlan')
url = reverse('ipam-api:vlan-detail', kwargs={'pk': vlan.pk})
with disable_warnings('netbox.api.views.ModelViewSet'):
with disable_logging(level=logging.WARNING):
response = self.client.delete(url, **self.header)

self.assertHttpStatus(response, status.HTTP_409_CONFLICT)
Expand Down
4 changes: 3 additions & 1 deletion netbox/utilities/tests/test_api.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from django.test import Client, TestCase, override_settings
from django.urls import reverse
from drf_spectacular.drainage import GENERATOR_STATS
from rest_framework import status

from core.models import ObjectType
Expand Down Expand Up @@ -264,5 +265,6 @@ def test_api_docs(self):
self.assertEqual(response.status_code, 200)

url = reverse('schema')
response = self.client.get(url)
with GENERATOR_STATS.silence(): # Suppress schema generator warnings
response = self.client.get(url)
self.assertEqual(response.status_code, 200)
9 changes: 7 additions & 2 deletions netbox/virtualization/tests/test_api.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import logging

from django.test import tag
from django.urls import reverse
from netaddr import IPNetwork
Expand All @@ -10,7 +12,9 @@
from extras.models import ConfigTemplate, CustomField
from ipam.choices import VLANQinQRoleChoices
from ipam.models import Prefix, VLAN, VRF
from utilities.testing import APITestCase, APIViewTestCases, create_test_device, create_test_virtualmachine
from utilities.testing import (
APITestCase, APIViewTestCases, create_test_device, create_test_virtualmachine, disable_logging,
)
from virtualization.choices import *
from virtualization.models import *

Expand Down Expand Up @@ -402,7 +406,8 @@ def test_bulk_delete_child_interfaces(self):

# Attempt to delete only the parent interface
url = self._get_detail_url(interface1)
self.client.delete(url, **self.header)
with disable_logging(level=logging.WARNING):
self.client.delete(url, **self.header)
self.assertEqual(virtual_machine.interfaces.count(), 4) # Parent was not deleted

# Attempt to bulk delete parent & child together
Expand Down