Skip to content

Commit fcdf172

Browse files
author
Kim van der Riet
committed
QPID-4984: WIP - compiles, but not functional.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/linearstore@1525050 13f79535-47bb-0310-9956-ffa450edef68
1 parent 7defd7e commit fcdf172

38 files changed

+2170
-566
lines changed

qpid/cpp/src/linearstore.cmake

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,12 +80,18 @@ if (BUILD_LINEARSTORE)
8080
qpid/linearstore/jrnl/deq_rec.cpp
8181
qpid/linearstore/jrnl/enq_map.cpp
8282
qpid/linearstore/jrnl/enq_rec.cpp
83+
qpid/linearstore/jrnl/EmptyFilePool.cpp
84+
qpid/linearstore/jrnl/EmptyFilePoolManager.cpp
85+
qpid/linearstore/jrnl/EmptyFilePoolPartition.cpp
8386
#qpid/linearstore/jrnl/fcntl.cpp
8487
qpid/linearstore/jrnl/jcntl.cpp
8588
qpid/linearstore/jrnl/jdir.cpp
8689
qpid/linearstore/jrnl/jerrno.cpp
8790
qpid/linearstore/jrnl/jexception.cpp
8891
#qpid/linearstore/jrnl/jinf.cpp
92+
qpid/linearstore/jrnl/JournalFile.cpp
93+
qpid/linearstore/jrnl/JournalFileController.cpp
94+
qpid/linearstore/jrnl/JournalLog.cpp
8995
qpid/linearstore/jrnl/jrec.cpp
9096
#qpid/linearstore/jrnl/lp_map.cpp
9197
#qpid/linearstore/jrnl/lpmgr.cpp
@@ -106,6 +112,7 @@ if (BUILD_LINEARSTORE)
106112
qpid/linearstore/BindingDbt.cpp
107113
qpid/linearstore/BufferValue.cpp
108114
qpid/linearstore/DataTokenImpl.cpp
115+
qpid/linearstore/EmptyFilePoolManagerImpl.cpp
109116
qpid/linearstore/IdDbt.cpp
110117
qpid/linearstore/IdSequence.cpp
111118
qpid/linearstore/JournalImpl.cpp
@@ -118,9 +125,9 @@ if (BUILD_LINEARSTORE)
118125
qpid/linearstore/jrnl/utils/deq_hdr.c
119126
qpid/linearstore/jrnl/utils/enq_hdr.c
120127
qpid/linearstore/jrnl/utils/file_hdr.c
121-
qpid/linearstore/jrnl/utils/rec_hdr.c
122-
qpid/linearstore/jrnl/utils/rec_tail.c
123-
qpid/linearstore/jrnl/utils/txn_hdr.c
128+
qpid/linearstore/jrnl/utils/rec_hdr.c
129+
qpid/linearstore/jrnl/utils/rec_tail.c
130+
qpid/linearstore/jrnl/utils/txn_hdr.c
124131
)
125132

126133
# linearstore include directories

qpid/cpp/src/qpid/legacystore/jrnl/rmgr.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ rmgr::initialize(aio_callback* const cbp)
7272
throw jexception(jerrno::JERR__MALLOC, oss.str(), "rmgr", "initialize");
7373
}
7474
_fhdr_aio_cb_ptr = new aio_cb;
75-
std::memset(_fhdr_aio_cb_ptr, 0, sizeof(aio_cb*));
75+
std::memset(_fhdr_aio_cb_ptr, 0, sizeof(aio_cb));
7676
}
7777

7878
void
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
*
3+
* Licensed to the Apache Software Foundation (ASF) under one
4+
* or more contributor license agreements. See the NOTICE file
5+
* distributed with this work for additional information
6+
* regarding copyright ownership. The ASF licenses this file
7+
* to you under the Apache License, Version 2.0 (the
8+
* "License"); you may not use this file except in compliance
9+
* with the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing,
14+
* software distributed under the License is distributed on an
15+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
* KIND, either express or implied. See the License for the
17+
* specific language governing permissions and limitations
18+
* under the License.
19+
*
20+
*/
21+
22+
#include "EmptyFilePoolManagerImpl.h"
23+
24+
#include "QpidLog.h"
25+
#include "qpid/linearstore/jrnl/EmptyFilePoolTypes.h"
26+
27+
namespace qpid {
28+
namespace linearstore {
29+
30+
EmptyFilePoolManagerImpl::EmptyFilePoolManagerImpl(const std::string& qlsStorePath) :
31+
qpid::qls_jrnl::EmptyFilePoolManager(qlsStorePath)
32+
{}
33+
34+
EmptyFilePoolManagerImpl::~EmptyFilePoolManagerImpl() {}
35+
36+
void EmptyFilePoolManagerImpl::findEfpPartitions() {
37+
qpid::qls_jrnl::EmptyFilePoolManager::findEfpPartitions();
38+
QLS_LOG(info, "EFP Manager initialization complete");
39+
std::vector<qpid::qls_jrnl::EmptyFilePoolPartition*> partitionList;
40+
std::vector<qpid::qls_jrnl::EmptyFilePool*> filePoolList;
41+
getEfpPartitions(partitionList);
42+
if (partitionList.size() == 0) {
43+
QLS_LOG(error, "NO EFP PARTITIONS FOUND! No queue creation is possible.")
44+
} else {
45+
QLS_LOG(info, "> EFP Partitions found: " << partitionList.size());
46+
for (std::vector<qpid::qls_jrnl::EmptyFilePoolPartition*>::const_iterator i=partitionList.begin(); i!= partitionList.end(); ++i) {
47+
filePoolList.clear();
48+
(*i)->getEmptyFilePools(filePoolList);
49+
QLS_LOG(info, " * Partition " << (*i)->partitionNumber() << " containing " << filePoolList.size() << " pool" <<
50+
(filePoolList.size()>1 ? "s" : "") << " at \'" << (*i)->partitionDirectory() << "\'");
51+
for (std::vector<qpid::qls_jrnl::EmptyFilePool*>::const_iterator j=filePoolList.begin(); j!=filePoolList.end(); ++j) {
52+
QLS_LOG(info, " - EFP \'" << (*j)->fileSizeKib() << "k\' containing " << (*j)->numEmptyFiles() <<
53+
" files of size " << (*j)->fileSizeKib() << " KiB totaling " << (*j)->cumFileSizeKib() << " KiB");
54+
}
55+
}
56+
}
57+
}
58+
59+
}} /* namespace qpid::linearstore */
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
*
3+
* Licensed to the Apache Software Foundation (ASF) under one
4+
* or more contributor license agreements. See the NOTICE file
5+
* distributed with this work for additional information
6+
* regarding copyright ownership. The ASF licenses this file
7+
* to you under the Apache License, Version 2.0 (the
8+
* "License"); you may not use this file except in compliance
9+
* with the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing,
14+
* software distributed under the License is distributed on an
15+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
* KIND, either express or implied. See the License for the
17+
* specific language governing permissions and limitations
18+
* under the License.
19+
*
20+
*/
21+
22+
#ifndef QPID_LINEARSTORE_EMPTYFILEPOOLMANAGERIMPL_H_
23+
#define QPID_LINEARSTORE_EMPTYFILEPOOLMANAGERIMPL_H_
24+
25+
#include "qpid/linearstore/jrnl/EmptyFilePoolManager.h"
26+
27+
namespace qpid {
28+
namespace linearstore {
29+
30+
class EmptyFilePoolManagerImpl: public qpid::qls_jrnl::EmptyFilePoolManager
31+
{
32+
public:
33+
EmptyFilePoolManagerImpl(const std::string& qlsStorePath);
34+
virtual ~EmptyFilePoolManagerImpl();
35+
void findEfpPartitions();
36+
};
37+
38+
}} /* namespace qpid::linearstore */
39+
40+
#endif /* QPID_LINEARSTORE_EMPTYFILEPOOLMANAGERIMPL_H_ */

qpid/cpp/src/qpid/linearstore/JournalImpl.cpp

Lines changed: 43 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,17 @@
2323

2424
#include "qpid/linearstore/jrnl/jerrno.h"
2525
#include "qpid/linearstore/jrnl/jexception.h"
26+
#include "qpid/linearstore/jrnl/EmptyFilePool.h"
2627
#include "qpid/log/Statement.h"
2728
#include "qpid/management/ManagementAgent.h"
28-
#include "qmf/org/apache/qpid/linearstore/ArgsJournalExpand.h"
29+
//#include "qmf/org/apache/qpid/linearstore/ArgsJournalExpand.h"
2930
#include "qmf/org/apache/qpid/linearstore/EventCreated.h"
3031
#include "qmf/org/apache/qpid/linearstore/EventEnqThresholdExceeded.h"
3132
#include "qmf/org/apache/qpid/linearstore/EventFull.h"
3233
#include "qmf/org/apache/qpid/linearstore/EventRecovered.h"
3334
#include "qpid/sys/Monitor.h"
3435
#include "qpid/sys/Timer.h"
35-
#include "qpid/linearstore/Log.h"
36+
#include "qpid/linearstore/QpidLog.h"
3637
#include "qpid/linearstore/StoreException.h"
3738

3839
using namespace qpid::qls_jrnl;
@@ -53,22 +54,23 @@ void GetEventsFireEvent::fire() { qpid::sys::Mutex::ScopedLock sl(_gefe_lock); i
5354
JournalImpl::JournalImpl(qpid::sys::Timer& timer_,
5455
const std::string& journalId,
5556
const std::string& journalDirectory,
56-
const std::string& journalBaseFilename,
57+
// const std::string& journalBaseFilename,
5758
const qpid::sys::Duration getEventsTimeout,
5859
const qpid::sys::Duration flushTimeout,
5960
qpid::management::ManagementAgent* a,
6061
DeleteCallback onDelete):
61-
jcntl(journalId, journalDirectory, journalBaseFilename),
62+
jcntl(journalId, journalDirectory/*, journalBaseFilename*/),
6263
timer(timer_),
6364
getEventsTimerSetFlag(false),
64-
lastReadRid(0),
65+
efpp(0),
66+
// lastReadRid(0),
6567
writeActivityFlag(false),
6668
flushTriggeredFlag(true),
67-
_xidp(0),
68-
_datap(0),
69-
_dlen(0),
70-
_dtok(),
71-
_external(false),
69+
// _xidp(0),
70+
// _datap(0),
71+
// _dlen(0),
72+
// _dtok(),
73+
// _external(false),
7274
deleteCallback(onDelete)
7375
{
7476
getEventsFireEventsPtr = new GetEventsFireEvent(this, getEventsTimeout);
@@ -82,7 +84,7 @@ JournalImpl::JournalImpl(qpid::sys::Timer& timer_,
8284

8385
QLS_LOG2(notice, _jid, "Created");
8486
std::ostringstream oss;
85-
oss << "Journal directory = \"" << journalDirectory << "\"; Base file name = \"" << journalBaseFilename << "\"";
87+
oss << "Journal directory = \"" << journalDirectory << "\"";
8688
QLS_LOG2(debug, _jid, oss.str());
8789
}
8890

@@ -95,7 +97,7 @@ JournalImpl::~JournalImpl()
9597
}
9698
getEventsFireEventsPtr->cancel();
9799
inactivityFireEventPtr->cancel();
98-
free_read_buffers();
100+
// free_read_buffers();
99101

100102
if (_mgmtObject.get() != 0) {
101103
_mgmtObject->resourceDestroy();
@@ -116,14 +118,14 @@ JournalImpl::initManagement(qpid::management::ManagementAgent* a)
116118

117119
_mgmtObject->set_name(_jid);
118120
_mgmtObject->set_directory(_jdir.dirname());
119-
_mgmtObject->set_baseFileName(_base_filename);
121+
// _mgmtObject->set_baseFileName(_base_filename);
120122
_mgmtObject->set_readPageSize(JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE);
121123
_mgmtObject->set_readPages(JRNL_RMGR_PAGES);
122124

123125
// The following will be set on initialize(), but being properties, these must be set to 0 in the meantime
124-
_mgmtObject->set_initialFileCount(0);
125-
_mgmtObject->set_dataFileSize(0);
126-
_mgmtObject->set_currentFileCount(0);
126+
//_mgmtObject->set_initialFileCount(0);
127+
//_mgmtObject->set_dataFileSize(0);
128+
//_mgmtObject->set_currentFileCount(0);
127129
_mgmtObject->set_writePageSize(0);
128130
_mgmtObject->set_writePages(0);
129131

@@ -133,22 +135,23 @@ JournalImpl::initManagement(qpid::management::ManagementAgent* a)
133135

134136

135137
void
136-
JournalImpl::initialize(/*const uint16_t num_jfiles,
137-
const bool auto_expand,
138-
const uint16_t ae_max_jfiles,
139-
const uint32_t jfsize_sblks,*/
138+
JournalImpl::initialize(qpid::qls_jrnl::EmptyFilePool* efpp_,
140139
const uint16_t wcache_num_pages,
141140
const uint32_t wcache_pgsize_sblks,
142141
qpid::qls_jrnl::aio_callback* const cbp)
143142
{
144-
std::ostringstream oss;
145-
// oss << "Initialize; num_jfiles=" << num_jfiles << " jfsize_sblks=" << jfsize_sblks;
146-
oss << "Initialize;";
147-
oss << " wcache_pgsize_sblks=" << wcache_pgsize_sblks;
148-
oss << " wcache_num_pages=" << wcache_num_pages;
149-
QLS_LOG2(debug, _jid, oss.str());
150-
jcntl::initialize(/*num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks,*/ wcache_num_pages, wcache_pgsize_sblks, cbp);
151-
QLS_LOG2(debug, _jid, "Initialization complete");
143+
efpp = efpp_;
144+
// efpp->createJournal(_jdir);
145+
// QLS_LOG2(notice, _jid, "Initialized");
146+
// std::ostringstream oss;
147+
//// oss << "Initialize; num_jfiles=" << num_jfiles << " jfsize_sblks=" << jfsize_sblks;
148+
// oss << "Initialize; efpPartitionNumber=" << efpp_->getPartitionNumber();
149+
// oss << " efpFileSizeKb=" << efpp_->fileSizeKib();
150+
// oss << " wcache_pgsize_sblks=" << wcache_pgsize_sblks;
151+
// oss << " wcache_num_pages=" << wcache_num_pages;
152+
// QLS_LOG2(debug, _jid, oss.str());
153+
jcntl::initialize(/*num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks,*/ efpp, wcache_num_pages, wcache_pgsize_sblks, cbp);
154+
// QLS_LOG2(debug, _jid, "Initialization complete");
152155
// TODO: replace for linearstore: _lpmgr
153156
/*
154157
if (_mgmtObject.get() != 0)
@@ -261,6 +264,7 @@ JournalImpl::recover_complete()
261264
//#define AIO_SLEEP_TIME_US 10 // 0.01 ms
262265
// Return true if content is recovered from store; false if content is external and must be recovered from an external store.
263266
// Throw exception for all errors.
267+
/*
264268
bool
265269
JournalImpl::loadMsgContent(uint64_t rid, std::string& data, size_t length, size_t offset)
266270
{
@@ -351,6 +355,7 @@ JournalImpl::loadMsgContent(uint64_t rid, std::string& data, size_t length, size
351355
}
352356
return true;
353357
}
358+
*/
354359

355360
void
356361
JournalImpl::enqueue_data_record(const void* const data_buff, const size_t tot_data_len,
@@ -574,6 +579,7 @@ void
574579
JournalImpl::rd_aio_cb(std::vector<uint16_t>& /*pil*/)
575580
{}
576581

582+
/*
577583
void
578584
JournalImpl::free_read_buffers()
579585
{
@@ -586,6 +592,12 @@ JournalImpl::free_read_buffers()
586592
_datap = 0;
587593
}
588594
}
595+
*/
596+
597+
void
598+
JournalImpl::createStore() {
599+
600+
}
589601

590602
void
591603
JournalImpl::handleIoResult(const iores r)
@@ -624,12 +636,13 @@ JournalImpl::handleIoResult(const iores r)
624636
}
625637
}
626638

627-
qpid::management::Manageable::status_t JournalImpl::ManagementMethod (uint32_t methodId,
639+
qpid::management::Manageable::status_t JournalImpl::ManagementMethod (uint32_t /*methodId*/,
628640
qpid::management::Args& /*args*/,
629641
std::string& /*text*/)
630642
{
631643
Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
632644

645+
/*
633646
switch (methodId)
634647
{
635648
case _qmf::Journal::METHOD_EXPAND :
@@ -640,6 +653,7 @@ qpid::management::Manageable::status_t JournalImpl::ManagementMethod (uint32_t m
640653
status = Manageable::STATUS_NOT_IMPLEMENTED;
641654
break;
642655
}
656+
*/
643657

644658
return status;
645659
}

0 commit comments

Comments
 (0)