Skip to content

Commit 8af7504

Browse files
committed
use item partitioned lock for as much as possible
push cache_lock deeper into the abyss
1 parent 54f11de commit 8af7504

File tree

5 files changed

+80
-53
lines changed

5 files changed

+80
-53
lines changed

items.c

Lines changed: 30 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ item *do_item_alloc(char *key, const size_t nkey, const int flags, const rel_tim
121121
it = search;
122122
it->refcount = 1;
123123
slabs_adjust_mem_requested(it->slabs_clsid, ITEM_ntotal(it), ntotal);
124-
do_item_unlink(it, hash(ITEM_key(it), it->nkey, 0));
124+
do_item_unlink_nolock(it, hash(ITEM_key(it), it->nkey, 0));
125125
/* Initialize the item block: */
126126
it->slabs_clsid = 0;
127127
it->refcount = 0;
@@ -151,7 +151,7 @@ item *do_item_alloc(char *key, const size_t nkey, const int flags, const rel_tim
151151
it = search;
152152
it->refcount = 1;
153153
slabs_adjust_mem_requested(it->slabs_clsid, ITEM_ntotal(it), ntotal);
154-
do_item_unlink(it, hash(ITEM_key(it), it->nkey, 0));
154+
do_item_unlink_nolock(it, hash(ITEM_key(it), it->nkey, 0));
155155
/* Initialize the item block: */
156156
it->slabs_clsid = 0;
157157
it->refcount = 0;
@@ -172,7 +172,7 @@ item *do_item_alloc(char *key, const size_t nkey, const int flags, const rel_tim
172172
search->time + TAIL_REPAIR_TIME < current_time) {
173173
itemstats[id].tailrepairs++;
174174
search->refcount = 0;
175-
do_item_unlink(search, hash(ITEM_key(search), search->nkey, 0));
175+
do_item_unlink_nolock(search, hash(ITEM_key(search), search->nkey, 0));
176176
}
177177
return NULL;
178178
}
@@ -275,23 +275,41 @@ int do_item_link(item *it, const uint32_t hv) {
275275
assert((it->it_flags & (ITEM_LINKED|ITEM_SLABBED)) == 0);
276276
it->it_flags |= ITEM_LINKED;
277277
it->time = current_time;
278-
assoc_insert(it, hv);
279278

280279
STATS_LOCK();
281280
stats.curr_bytes += ITEM_ntotal(it);
282281
stats.curr_items += 1;
283282
stats.total_items += 1;
284283
STATS_UNLOCK();
285284

285+
mutex_lock(&cache_lock);
286286
/* Allocate a new CAS ID on link. */
287287
ITEM_set_cas(it, (settings.use_cas) ? get_cas_id() : 0);
288-
288+
assoc_insert(it, hv);
289289
item_link_q(it);
290+
pthread_mutex_unlock(&cache_lock);
290291

291292
return 1;
292293
}
293294

294295
void do_item_unlink(item *it, const uint32_t hv) {
296+
MEMCACHED_ITEM_UNLINK(ITEM_key(it), it->nkey, it->nbytes);
297+
if ((it->it_flags & ITEM_LINKED) != 0) {
298+
it->it_flags &= ~ITEM_LINKED;
299+
STATS_LOCK();
300+
stats.curr_bytes -= ITEM_ntotal(it);
301+
stats.curr_items -= 1;
302+
STATS_UNLOCK();
303+
mutex_lock(&cache_lock);
304+
assoc_delete(ITEM_key(it), it->nkey, hv);
305+
item_unlink_q(it);
306+
pthread_mutex_unlock(&cache_lock);
307+
if (it->refcount == 0) item_free(it);
308+
}
309+
}
310+
311+
/* FIXME: Is it necessary to keep thsi copy/pasted code? */
312+
void do_item_unlink_nolock(item *it, const uint32_t hv) {
295313
MEMCACHED_ITEM_UNLINK(ITEM_key(it), it->nkey, it->nbytes);
296314
if ((it->it_flags & ITEM_LINKED) != 0) {
297315
it->it_flags &= ~ITEM_LINKED;
@@ -323,9 +341,11 @@ void do_item_update(item *it) {
323341
assert((it->it_flags & ITEM_SLABBED) == 0);
324342

325343
if ((it->it_flags & ITEM_LINKED) != 0) {
344+
mutex_lock(&cache_lock);
326345
item_unlink_q(it);
327346
it->time = current_time;
328347
item_link_q(it);
348+
pthread_mutex_unlock(&cache_lock);
329349
}
330350
}
331351
}
@@ -387,22 +407,6 @@ void do_item_stats(ADD_STAT add_stats, void *c) {
387407
char key_str[STAT_KEY_LEN];
388408
char val_str[STAT_VAL_LEN];
389409
int klen = 0, vlen = 0;
390-
int search = 50;
391-
while (search > 0 &&
392-
tails[i] != NULL &&
393-
((settings.oldest_live != 0 && /* Item flushd */
394-
settings.oldest_live <= current_time &&
395-
tails[i]->time <= settings.oldest_live) ||
396-
(tails[i]->exptime != 0 && /* and not expired */
397-
tails[i]->exptime < current_time))) {
398-
--search;
399-
if (tails[i]->refcount == 0) {
400-
do_item_unlink(tails[i], hash(ITEM_key(tails[i]),
401-
tails[i]->nkey, 0));
402-
} else {
403-
break;
404-
}
405-
}
406410
if (tails[i] == NULL) {
407411
/* We removed all of the items in this slab class */
408412
continue;
@@ -470,7 +474,9 @@ void do_item_stats_sizes(ADD_STAT add_stats, void *c) {
470474

471475
/** wrapper around assoc_find which does the lazy expiration logic */
472476
item *do_item_get(const char *key, const size_t nkey, const uint32_t hv) {
477+
mutex_lock(&cache_lock);
473478
item *it = assoc_find(key, nkey, hv);
479+
pthread_mutex_unlock(&cache_lock);
474480
int was_found = 0;
475481

476482
if (settings.verbose > 2) {
@@ -484,7 +490,7 @@ item *do_item_get(const char *key, const size_t nkey, const uint32_t hv) {
484490

485491
if (it != NULL && settings.oldest_live != 0 && settings.oldest_live <= current_time &&
486492
it->time <= settings.oldest_live) {
487-
do_item_unlink(it, hv); /* MTSAFE - cache_lock held */
493+
do_item_unlink(it, hv); /* MTSAFE - item_lock held */
488494
it = NULL;
489495
}
490496

@@ -494,7 +500,7 @@ item *do_item_get(const char *key, const size_t nkey, const uint32_t hv) {
494500
}
495501

496502
if (it != NULL && it->exptime != 0 && it->exptime <= current_time) {
497-
do_item_unlink(it, hv); /* MTSAFE - cache_lock held */
503+
do_item_unlink(it, hv); /* MTSAFE - item_lock held */
498504
it = NULL;
499505
}
500506

@@ -524,16 +530,6 @@ item *do_item_touch(const char *key, size_t nkey, uint32_t exptime,
524530
return it;
525531
}
526532

527-
/** returns an item whether or not it's expired. */
528-
item *do_item_get_nocheck(const char *key, const size_t nkey, const uint32_t hv) {
529-
item *it = assoc_find(key, nkey, hv);
530-
if (it) {
531-
it->refcount++;
532-
DEBUG_REFCNT(it, '+');
533-
}
534-
return it;
535-
}
536-
537533
/* expires items that are more recent than the oldest_live setting. */
538534
void do_item_flush_expired(void) {
539535
int i;
@@ -550,7 +546,7 @@ void do_item_flush_expired(void) {
550546
if (iter->time >= settings.oldest_live) {
551547
next = iter->next;
552548
if ((iter->it_flags & ITEM_SLABBED) == 0) {
553-
do_item_unlink(iter, hash(ITEM_key(iter), iter->nkey, 0));
549+
do_item_unlink_nolock(iter, hash(ITEM_key(iter), iter->nkey, 0));
554550
}
555551
} else {
556552
/* We've hit the first old item. Continue to the next queue. */

items.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ bool item_size_ok(const size_t nkey, const int flags, const int nbytes);
88

99
int do_item_link(item *it, const uint32_t hv); /** may fail if transgresses limits */
1010
void do_item_unlink(item *it, const uint32_t hv);
11+
void do_item_unlink_nolock(item *it, const uint32_t hv);
1112
void do_item_remove(item *it);
1213
void do_item_update(item *it); /** update LRU time to current and reposition */
1314
int do_item_replace(item *it, item *new_it, const uint32_t hv);
@@ -20,7 +21,6 @@ void do_item_stats_sizes(ADD_STAT add_stats, void *c);
2021
void do_item_flush_expired(void);
2122

2223
item *do_item_get(const char *key, const size_t nkey, const uint32_t hv);
23-
item *do_item_get_nocheck(const char *key, const size_t nkey, const uint32_t hv);
2424
item *do_item_touch(const char *key, const size_t nkey, uint32_t exptime, const uint32_t hv);
2525
void item_stats_reset(void);
2626
extern pthread_mutex_t cache_lock;

memcached.c

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2310,7 +2310,7 @@ enum store_item_type do_store_item(item *it, int comm, conn *c, const uint32_t h
23102310

23112311
flags = (int) strtol(ITEM_suffix(old_it), (char **) NULL, 10);
23122312

2313-
new_it = do_item_alloc(key, it->nkey, flags, old_it->exptime, it->nbytes + old_it->nbytes - 2 /* CRLF */);
2313+
new_it = item_alloc(key, it->nkey, flags, old_it->exptime, it->nbytes + old_it->nbytes - 2 /* CRLF */);
23142314

23152315
if (new_it == NULL) {
23162316
/* SERVER_ERROR out of memory */
@@ -3080,7 +3080,7 @@ enum delta_result_type do_add_delta(conn *c, const char *key, const size_t nkey,
30803080
res = strlen(buf);
30813081
if (res + 2 > it->nbytes || it->refcount != 1) { /* need to realloc */
30823082
item *new_it;
3083-
new_it = do_item_alloc(ITEM_key(it), it->nkey, atoi(ITEM_suffix(it) + 1), it->exptime, res + 2 );
3083+
new_it = item_alloc(ITEM_key(it), it->nkey, atoi(ITEM_suffix(it) + 1), it->exptime, res + 2 );
30843084
if (new_it == 0) {
30853085
do_item_remove(it);
30863086
return EOM;
@@ -3095,7 +3095,9 @@ enum delta_result_type do_add_delta(conn *c, const char *key, const size_t nkey,
30953095
} else { /* replace in-place */
30963096
/* When changing the value without replacing the item, we
30973097
need to update the CAS on the existing item. */
3098+
mutex_lock(&cache_lock); /* FIXME */
30983099
ITEM_set_cas(it, (settings.use_cas) ? get_cas_id() : 0);
3100+
pthread_mutex_unlock(&cache_lock);
30993101

31003102
memcpy(ITEM_data(it), buf, res);
31013103
memset(ITEM_data(it) + res, ' ', it->nbytes - res - 2);

memcached.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -513,6 +513,8 @@ void item_stats_sizes(ADD_STAT add_stats, void *c);
513513
void item_unlink(item *it);
514514
void item_update(item *it);
515515

516+
void item_lock(uint32_t hv);
517+
void item_unlock(uint32_t hv);
516518
void STATS_LOCK(void);
517519
void STATS_UNLOCK(void);
518520
void threadlocal_stats_reset(void);

thread.c

Lines changed: 43 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,10 @@ static pthread_mutex_t stats_lock;
4646
static CQ_ITEM *cqi_freelist;
4747
static pthread_mutex_t cqi_freelist_lock;
4848

49+
static pthread_mutex_t *item_locks;
50+
/* TODO: Make this a function of the # of threads */
51+
#define ITEM_LOCKS 4000
52+
4953
static LIBEVENT_DISPATCHER_THREAD dispatcher_thread;
5054

5155
/*
@@ -64,6 +68,14 @@ static pthread_cond_t init_cond;
6468

6569
static void thread_libevent_process(int fd, short which, void *arg);
6670

71+
void item_lock(uint32_t hv) {
72+
mutex_lock(&item_locks[hv % ITEM_LOCKS]);
73+
}
74+
75+
void item_unlock(uint32_t hv) {
76+
pthread_mutex_unlock(&item_locks[hv % ITEM_LOCKS]);
77+
}
78+
6779
/*
6880
* Initializes a connection queue.
6981
*/
@@ -342,19 +354,19 @@ item *item_get(const char *key, const size_t nkey) {
342354
item *it;
343355
uint32_t hv;
344356
hv = hash(key, nkey, 0);
345-
mutex_lock(&cache_lock);
357+
item_lock(hv);
346358
it = do_item_get(key, nkey, hv);
347-
pthread_mutex_unlock(&cache_lock);
359+
item_unlock(hv);
348360
return it;
349361
}
350362

351363
item *item_touch(const char *key, size_t nkey, uint32_t exptime) {
352364
item *it;
353365
uint32_t hv;
354366
hv = hash(key, nkey, 0);
355-
mutex_lock(&cache_lock);
367+
item_lock(hv);
356368
it = do_item_touch(key, nkey, exptime, hv);
357-
pthread_mutex_unlock(&cache_lock);
369+
item_unlock(hv);
358370
return it;
359371
}
360372

@@ -366,9 +378,9 @@ int item_link(item *item) {
366378
uint32_t hv;
367379

368380
hv = hash(ITEM_key(item), item->nkey, 0);
369-
mutex_lock(&cache_lock);
381+
item_lock(hv);
370382
ret = do_item_link(item, hv);
371-
pthread_mutex_unlock(&cache_lock);
383+
item_unlock(hv);
372384
return ret;
373385
}
374386

@@ -377,9 +389,12 @@ int item_link(item *item) {
377389
* needed.
378390
*/
379391
void item_remove(item *item) {
380-
mutex_lock(&cache_lock);
392+
uint32_t hv;
393+
hv = hash(ITEM_key(item), item->nkey, 0);
394+
395+
item_lock(hv);
381396
do_item_remove(item);
382-
pthread_mutex_unlock(&cache_lock);
397+
item_unlock(hv);
383398
}
384399

385400
/*
@@ -397,18 +412,21 @@ int item_replace(item *old_it, item *new_it, const uint32_t hv) {
397412
void item_unlink(item *item) {
398413
uint32_t hv;
399414
hv = hash(ITEM_key(item), item->nkey, 0);
400-
mutex_lock(&cache_lock);
415+
item_lock(hv);
401416
do_item_unlink(item, hv);
402-
pthread_mutex_unlock(&cache_lock);
417+
item_unlock(hv);
403418
}
404419

405420
/*
406421
* Moves an item to the back of the LRU queue.
407422
*/
408423
void item_update(item *item) {
409-
mutex_lock(&cache_lock);
424+
uint32_t hv;
425+
hv = hash(ITEM_key(item), item->nkey, 0);
426+
427+
item_lock(hv);
410428
do_item_update(item);
411-
pthread_mutex_unlock(&cache_lock);
429+
item_unlock(hv);
412430
}
413431

414432
/*
@@ -422,9 +440,9 @@ enum delta_result_type add_delta(conn *c, const char *key,
422440
uint32_t hv;
423441

424442
hv = hash(key, nkey, 0);
425-
mutex_lock(&cache_lock);
443+
item_lock(hv);
426444
ret = do_add_delta(c, key, nkey, incr, delta, buf, cas, hv);
427-
pthread_mutex_unlock(&cache_lock);
445+
item_unlock(hv);
428446
return ret;
429447
}
430448

@@ -436,9 +454,9 @@ enum store_item_type store_item(item *item, int comm, conn* c) {
436454
uint32_t hv;
437455

438456
hv = hash(ITEM_key(item), item->nkey, 0);
439-
mutex_lock(&cache_lock);
457+
item_lock(hv);
440458
ret = do_store_item(item, comm, c, hv);
441-
pthread_mutex_unlock(&cache_lock);
459+
item_unlock(hv);
442460
return ret;
443461
}
444462

@@ -616,6 +634,15 @@ void thread_init(int nthreads, struct event_base *main_base) {
616634
pthread_mutex_init(&cqi_freelist_lock, NULL);
617635
cqi_freelist = NULL;
618636

637+
item_locks = calloc(ITEM_LOCKS, sizeof(pthread_mutex_t));
638+
if (! item_locks) {
639+
perror("Can't allocate item locks");
640+
exit(1);
641+
}
642+
for (i = 0; i < ITEM_LOCKS; i++) {
643+
pthread_mutex_init(&item_locks[i], NULL);
644+
}
645+
619646
threads = calloc(nthreads, sizeof(LIBEVENT_THREAD));
620647
if (! threads) {
621648
perror("Can't allocate thread descriptors");

0 commit comments

Comments
 (0)