1
+ import logging
1
2
from django .core .exceptions import ImproperlyConfigured
2
3
from django .core .management import call_command
3
4
from django .db .models .loading import get_model
4
5
5
- from celery .task import Task
6
6
from celery_haystack .conf import settings
7
7
8
8
try :
9
- from haystack import connections
10
- index_holder = connections ['default' ].get_unified_index ()
9
+ from haystack import connections , connection_router
11
10
from haystack .exceptions import NotHandled as IndexNotFoundException
12
11
legacy = False
13
12
except ImportError :
14
13
try :
15
- from haystack import site as index_holder
14
+ from haystack import site
16
15
from haystack .exceptions import NotRegistered as IndexNotFoundException # noqa
17
16
legacy = True
18
17
except ImportError , e :
19
18
raise ImproperlyConfigured ("Haystack couldn't be imported: %s" % e )
20
19
20
+ if settings .CELERY_HAYSTACK_TRANSACTION_SAFE and not settings .CELERY_ALWAYS_EAGER :
21
+ from djcelery_transactions import PostTransactionTask as Task
22
+ else :
23
+ from celery .task import Task # noqa
24
+
21
25
22
26
class CeleryHaystackSignalHandler (Task ):
23
27
using = settings .CELERY_HAYSTACK_DEFAULT_ALIAS
24
28
max_retries = settings .CELERY_HAYSTACK_MAX_RETRIES
25
29
default_retry_delay = settings .CELERY_HAYSTACK_RETRY_DELAY
26
30
31
+ def get_logger (self , * args , ** kwargs ):
32
+ logger = super (CeleryHaystackSignalHandler , self ).get_logger (* args , ** kwargs )
33
+ if settings .DEBUG :
34
+ logger .setLogger (logging .DEBUG )
35
+ return logger
36
+
27
37
def split_identifier (self , identifier , ** kwargs ):
28
38
"""
29
39
Break down the identifier representing the instance.
@@ -53,39 +63,40 @@ def get_model_class(self, object_path, **kwargs):
53
63
model_class = get_model (app_name , classname )
54
64
55
65
if model_class is None :
56
- logger = self .get_logger (** kwargs )
57
- logger .error ("Could not load model "
58
- "from '%s'. Moving on..." % object_path )
59
- return None
60
-
66
+ raise ImproperlyConfigured ("Could not load model '%s'." %
67
+ object_path )
61
68
return model_class
62
69
63
70
def get_instance (self , model_class , pk , ** kwargs ):
64
71
"""
65
72
Fetch the instance in a standarized way.
66
73
"""
67
74
logger = self .get_logger (** kwargs )
75
+ instance = None
68
76
try :
69
- instance = model_class .objects .get (pk = pk )
77
+ instance = model_class ._default_manager .get (pk = int ( pk ) )
70
78
except model_class .DoesNotExist :
71
- logger .error ("Couldn't load model instance "
72
- "with pk #%s. Somehow it went missing?" % pk )
73
- return None
79
+ logger .error ("Couldn't load %s.%s.%s. Somehow it went missing?" %
80
+ ( model_class . _meta . app_label . lower (),
81
+ model_class . _meta . object_name . lower (), pk ))
74
82
except model_class .MultipleObjectsReturned :
75
- logger .error ("More than one object with pk #%s. Oops?" % pk )
76
- return None
77
-
83
+ logger .error ("More than one object with pk %s. Oops?" % pk )
78
84
return instance
79
85
80
86
def get_index (self , model_class , ** kwargs ):
81
87
"""
82
88
Fetch the model's registered ``SearchIndex`` in a standarized way.
83
89
"""
84
- logger = self .get_logger (** kwargs )
85
90
try :
91
+ if legacy :
92
+ index_holder = site
93
+ else :
94
+ backend_alias = connection_router .for_write (** {'models' : [model_class ]})
95
+ index_holder = connections [backend_alias ].get_unified_index () # noqa
86
96
return index_holder .get_index (model_class )
87
97
except IndexNotFoundException :
88
- logger .error ("Couldn't find a SearchIndex for %s." % model_class )
98
+ raise ImproperlyConfigured ("Couldn't find a SearchIndex for %s." %
99
+ model_class )
89
100
return None
90
101
91
102
def get_handler_options (self , ** kwargs ):
@@ -104,58 +115,69 @@ def run(self, action, identifier, **kwargs):
104
115
# First get the object path and pk (e.g. ('notes.note', 23))
105
116
object_path , pk = self .split_identifier (identifier , ** kwargs )
106
117
if object_path is None or pk is None :
107
- logger .error ("Skipping." )
108
- return
118
+ msg = "Couldn't handle object with identifier %s" % identifier
119
+ logger .error (msg )
120
+ raise ValueError (msg )
109
121
110
122
# Then get the model class for the object path
111
123
model_class = self .get_model_class (object_path , ** kwargs )
112
124
current_index = self .get_index (model_class , ** kwargs )
125
+ current_index_name = "." .join ([current_index .__class__ .__module__ ,
126
+ current_index .__class__ .__name__ ])
113
127
114
128
if action == 'delete' :
115
- # If the object is gone, we'll use just the identifier against the
116
- # index.
129
+ # If the object is gone, we'll use just the identifier
130
+ # against the index.
117
131
try :
118
132
handler_options = self .get_handler_options (** kwargs )
119
133
current_index .remove_object (identifier , ** handler_options )
120
134
except Exception , exc :
121
135
logger .error (exc )
122
- self .retry ([ action , identifier ], kwargs , exc = exc )
136
+ self .retry (exc = exc )
123
137
else :
124
- logger .debug ("Deleted '%s' from index" % identifier )
125
- return
126
-
138
+ msg = ("Deleted '%s' (with %s)" %
139
+ (identifier , current_index_name ))
140
+ logger .debug (msg )
141
+ return msg
127
142
elif action == 'update' :
128
143
# and the instance of the model class with the pk
129
144
instance = self .get_instance (model_class , pk , ** kwargs )
130
145
if instance is None :
131
- logger .debug ("Didn't update index for '%s'" % identifier )
132
- return
146
+ logger .debug ("Failed updating '%s' (with %s)" %
147
+ (identifier , current_index_name ))
148
+ raise ValueError ("Couldn't load object '%s'" % identifier )
133
149
134
150
# Call the appropriate handler of the current index and
135
151
# handle exception if neccessary
136
- logger .debug ("Indexing '%s'." % instance )
137
152
try :
138
153
handler_options = self .get_handler_options (** kwargs )
139
154
current_index .update_object (instance , ** handler_options )
140
155
except Exception , exc :
141
156
logger .error (exc )
142
- self .retry ([ action , identifier ], kwargs , exc = exc )
157
+ self .retry (exc = exc )
143
158
else :
144
- logger .debug ("Updated index with '%s'" % instance )
159
+ msg = ("Updated '%s' (with %s)" %
160
+ (identifier , current_index_name ))
161
+ logger .debug (msg )
162
+ return msg
145
163
else :
146
164
logger .error ("Unrecognized action '%s'. Moving on..." % action )
147
- self . retry ([ action , identifier ], kwargs , exc = exc )
165
+ raise ValueError ( "Unrecognized action %s" % action )
148
166
149
167
150
168
class CeleryHaystackUpdateIndex (Task ):
151
169
"""
152
170
A celery task class to be used to call the update_index management
153
171
command from Celery.
154
172
"""
173
+ def get_logger (self , * args , ** kwargs ):
174
+ logger = super (CeleryHaystackUpdateIndex , self ).get_logger (* args , ** kwargs )
175
+ if settings .DEBUG :
176
+ logger .setLogger (logging .DEBUG )
177
+ return logger
178
+
155
179
def run (self , apps = None , ** kwargs ):
156
180
logger = self .get_logger (** kwargs )
157
- logger .info ("Starting update index" )
158
- # Run the update_index management command
159
181
defaults = {
160
182
'batchsize' : settings .CELERY_HAYSTACK_COMMAND_BATCH_SIZE ,
161
183
'age' : settings .CELERY_HAYSTACK_COMMAND_AGE ,
@@ -167,5 +189,7 @@ def run(self, apps=None, **kwargs):
167
189
defaults .update (kwargs )
168
190
if apps is None :
169
191
apps = settings .CELERY_HAYSTACK_COMMAND_APPS
192
+ # Run the update_index management command
193
+ logger .info ("Starting update index" )
170
194
call_command ('update_index' , * apps , ** defaults )
171
195
logger .info ("Finishing update index" )
0 commit comments