Skip to content

Commit db966d7

Browse files
committed
优化代码,修复bug,完善日志输出。
1 parent b198058 commit db966d7

File tree

22 files changed

+46
-59
lines changed

22 files changed

+46
-59
lines changed

src/EQueue.AdminWeb/EQueue.AdminWeb.csproj

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,9 @@
4949
<Reference Include="Autofac.Integration.Mvc">
5050
<HintPath>..\packages\Autofac.Mvc5.3.3.0\lib\net45\Autofac.Integration.Mvc.dll</HintPath>
5151
</Reference>
52-
<Reference Include="ECommon, Version=1.5.2.0, Culture=neutral, processorArchitecture=MSIL">
52+
<Reference Include="ECommon, Version=1.5.3.0, Culture=neutral, processorArchitecture=MSIL">
5353
<SpecificVersion>False</SpecificVersion>
54-
<HintPath>..\packages\ECommon.1.5.2\lib\net45\ECommon.dll</HintPath>
54+
<HintPath>..\packages\ECommon.1.5.3\lib\net45\ECommon.dll</HintPath>
5555
</Reference>
5656
<Reference Include="ECommon.Autofac, Version=1.4.2.0, Culture=neutral, processorArchitecture=MSIL">
5757
<SpecificVersion>False</SpecificVersion>
@@ -244,4 +244,4 @@
244244
</Target>
245245
<Target Name="AfterBuild">
246246
</Target> -->
247-
</Project>
247+
</Project>

src/EQueue.AdminWeb/Web.config

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,4 +44,4 @@
4444
</dependentAssembly>
4545
</assemblyBinding>
4646
</runtime>
47-
</configuration>
47+
</configuration>

src/EQueue.AdminWeb/packages.config

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
<package id="Autofac" version="3.4.0" targetFramework="net45" />
55
<package id="Autofac.Mvc5" version="3.3.0" targetFramework="net45" />
66
<package id="bootstrap" version="3.0.0" targetFramework="net45" />
7-
<package id="ECommon" version="1.5.2" targetFramework="net45" />
7+
<package id="ECommon" version="1.5.3" targetFramework="net45" />
88
<package id="ECommon.Autofac" version="1.4.2" targetFramework="net45" />
99
<package id="ECommon.JsonNet" version="1.4.2" targetFramework="net45" />
1010
<package id="ECommon.Log4Net" version="1.4.2" targetFramework="net45" />
@@ -19,4 +19,4 @@
1919
<package id="Newtonsoft.Json" version="5.0.8" targetFramework="net45" />
2020
<package id="Respond" version="1.2.0" targetFramework="net45" />
2121
<package id="WebGrease" version="1.5.2" targetFramework="net45" />
22-
</packages>
22+
</packages>

src/EQueue/Broker/Client/ConsumerManager.cs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,10 @@ public class ConsumerManager
1111
{
1212
private readonly ConcurrentDictionary<string, ConsumerGroup> _consumerGroupDict = new ConcurrentDictionary<string, ConsumerGroup>();
1313
private readonly IScheduleService _scheduleService;
14-
private readonly ILogger _logger;
1514

1615
public ConsumerManager()
1716
{
1817
_scheduleService = ObjectContainer.Resolve<IScheduleService>();
19-
_logger = ObjectContainer.Resolve<ILoggerFactory>().Create(GetType().FullName);
2018
}
2119

2220
public void Start()

src/EQueue/Broker/DefaultMessageStore.cs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,11 @@ private void DeleteMessages()
111111
{
112112
if (_chunkManager.RemoveChunk(chunk))
113113
{
114-
_logger.InfoFormat("Message chunk {0} is deleted.", chunk);
114+
_logger.InfoFormat("Message chunk #{0} is deleted, chunkPositionScale: [{1}, {2}], minConsumedMessagePosition: {3}",
115+
chunk.ChunkHeader.ChunkNumber,
116+
chunk.ChunkHeader.ChunkDataStartPosition,
117+
chunk.ChunkHeader.ChunkDataEndPosition,
118+
_minConsumedMessagePosition);
115119
}
116120
}
117121
}

src/EQueue/Broker/DefaultQueueStore.cs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ public long GetMinConusmedMessagePosition()
111111

112112
if (queue != null && minConsumedQueueOffset >= 0)
113113
{
114-
return queue.GetMessagePosition(minConsumedQueueOffset);
114+
return queue.GetMessagePosition(minConsumedQueueOffset, false);
115115
}
116116

117117
return -1L;
@@ -350,15 +350,16 @@ private void DeleteQueueMessages()
350350
{
351351
try
352352
{
353-
foreach (var queue in _queueDict.Values)
353+
var queues = _queueDict.OrderBy(x => x.Key).Select(x => x.Value).ToList();
354+
foreach (var queue in queues)
354355
{
355356
try
356357
{
357358
queue.DeleteMessages(_messageStore.MinMessagePosition);
358359
}
359360
catch (Exception ex)
360361
{
361-
_logger.Error(string.Format("Delete queue messages has exception, topic: {0}, queueId: {1}", queue.Topic, queue.QueueId), ex);
362+
_logger.Error(string.Format("Delete queue (topic: {0}, queueId: {1}) messages has exception.", queue.Topic, queue.QueueId), ex);
362363
}
363364
}
364365
}

src/EQueue/Broker/Queue.cs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -70,10 +70,10 @@ public void AddMessage(long messagePosition)
7070
{
7171
_chunkWriter.Write(new QueueLogRecord(messagePosition + 1));
7272
}
73-
public long GetMessagePosition(long queueOffset)
73+
public long GetMessagePosition(long queueOffset, bool autoCache = true)
7474
{
7575
var position = queueOffset * _chunkManager.Config.ChunkDataUnitSize;
76-
var record = _chunkReader.TryReadAt(position, ReadMessageIndex);
76+
var record = _chunkReader.TryReadAt(position, ReadMessageIndex, autoCache);
7777
if (record == null)
7878
{
7979
return -1L;
@@ -106,23 +106,22 @@ public long GetMinQueueOffset()
106106
}
107107
public void DeleteMessages(long minMessagePosition)
108108
{
109-
var chunks = _chunkManager.GetAllChunks().Where(x => x.IsCompleted);
109+
var chunks = _chunkManager.GetAllChunks().Where(x => x.IsCompleted).OrderBy(x => x.ChunkHeader.ChunkNumber);
110110

111111
foreach (var chunk in chunks)
112112
{
113113
var maxPosition = chunk.ChunkHeader.ChunkDataEndPosition - _chunkManager.Config.ChunkDataUnitSize;
114-
var record = _chunkReader.TryReadAt(maxPosition, ReadMessageIndex);
114+
var record = _chunkReader.TryReadAt(maxPosition, ReadMessageIndex, false);
115115
if (record == null)
116116
{
117117
continue;
118118
}
119119
var chunkLastMessagePosition = record.MessageLogPosition - 1;
120-
121120
if (chunkLastMessagePosition < minMessagePosition)
122121
{
123122
if (_chunkManager.RemoveChunk(chunk))
124123
{
125-
_logger.InfoFormat("Queue chunk {0} is deleted, chunk last message position: {1}", chunk, chunkLastMessagePosition);
124+
_logger.InfoFormat("Queue (topic: {0}, queueId: {1}) chunk #{2} is deleted, chunkLastMessagePosition: {3}, messageStoreMinMessagePosition: {4}", Topic, QueueId, chunk.ChunkHeader.ChunkNumber, chunkLastMessagePosition, minMessagePosition);
126125
}
127126
}
128127
}

src/EQueue/Broker/Storage/Chunk.cs

Lines changed: 4 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -284,10 +284,6 @@ private void InitNew(int chunkNumber)
284284
try
285285
{
286286
_memoryChunk = Chunk.CreateNew(_filename, chunkNumber, _chunkManager, _chunkConfig, true);
287-
if (_logger.IsDebugEnabled)
288-
{
289-
_logger.DebugFormat("Cached new chunk {0} to memory.", this);
290-
}
291287
}
292288
catch (OutOfMemoryException)
293289
{
@@ -403,10 +399,6 @@ private void InitOngoing<T>(Func<byte[], T> readRecordFunc) where T : ILogRecord
403399
try
404400
{
405401
_memoryChunk = Chunk.FromOngoingFile<T>(_filename, _chunkManager, _chunkConfig, readRecordFunc, true);
406-
if (_logger.IsDebugEnabled)
407-
{
408-
_logger.DebugFormat("Cached ongoing chunk {0} to memory.", this);
409-
}
410402
}
411403
catch (OutOfMemoryException)
412404
{
@@ -454,10 +446,6 @@ public bool TryCacheInMemory(bool shouldCacheNextChunk)
454446
return false;
455447
}
456448
_memoryChunk = Chunk.FromCompletedFile(_filename, _chunkManager, _chunkConfig, true);
457-
if (_logger.IsDebugEnabled)
458-
{
459-
_logger.DebugFormat("Cached completed chunk {0} to memory.", this);
460-
}
461449
if (shouldCacheNextChunk)
462450
{
463451
Task.Factory.StartNew(() => _chunkManager.TryCacheNextChunk(this));
@@ -490,10 +478,6 @@ public bool UnCacheFromMemory()
490478
var memoryChunk = _memoryChunk;
491479
_memoryChunk = null;
492480
memoryChunk.Dispose();
493-
if (_logger.IsDebugEnabled)
494-
{
495-
_logger.DebugFormat("Uncached completed chunk {0} from memory.", this);
496-
}
497481
return true;
498482
}
499483
catch (Exception ex)
@@ -503,7 +487,7 @@ public bool UnCacheFromMemory()
503487
}
504488
}
505489
}
506-
public T TryReadAt<T>(long dataPosition, Func<byte[], T> readRecordFunc) where T : class, ILogRecord
490+
public T TryReadAt<T>(long dataPosition, Func<byte[], T> readRecordFunc, bool autoCache = true) where T : class, ILogRecord
507491
{
508492
if (_isDestroying)
509493
{
@@ -512,8 +496,6 @@ public T TryReadAt<T>(long dataPosition, Func<byte[], T> readRecordFunc) where T
512496

513497
_lastActiveTime = DateTime.Now;
514498

515-
_chunkManager.TryCacheNextChunk(this);
516-
517499
if (!_isMemoryChunk)
518500
{
519501
if (_cacheItems != null)
@@ -544,7 +526,7 @@ public T TryReadAt<T>(long dataPosition, Func<byte[], T> readRecordFunc) where T
544526
}
545527
}
546528

547-
if (!_isMemoryChunk && _isCompleted && Interlocked.CompareExchange(ref _cachingChunk, 1, 0) == 0)
529+
if (autoCache && !_isMemoryChunk && _isCompleted && Interlocked.CompareExchange(ref _cachingChunk, 1, 0) == 0)
548530
{
549531
Task.Factory.StartNew(() => TryCacheInMemory(true));
550532
}
@@ -1229,9 +1211,8 @@ private void PrintReadStatus()
12291211
var cacheReadThroughput = cacheItemReadCount - _previousCacheReadCount;
12301212
_previousCacheReadCount = cacheItemReadCount;
12311213

1232-
_logger.DebugFormat("Read status: chunk: {0}, currentTime: {1}, fileRead: {2}/s, unmanagedRead: {3}/s, cacheRead: {4}/s",
1233-
this,
1234-
DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff"),
1214+
_logger.DebugFormat("Read status: chunkNum: #{0}, fileRead: {1}/s, unmanagedRead: {2}/s, cacheRead: {3}/s",
1215+
ChunkHeader.ChunkNumber,
12351216
fileReadThroughput,
12361217
unmanagedReadThroughput,
12371218
cacheReadThroughput);

src/EQueue/Broker/Storage/ChunkReader.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ public ChunkReader(ChunkManager chunkManager, ChunkWriter chunkWriter)
1919
_chunkWriter = chunkWriter;
2020
}
2121

22-
public T TryReadAt<T>(long position, Func<byte[], T> readRecordFunc) where T : class, ILogRecord
22+
public T TryReadAt<T>(long position, Func<byte[], T> readRecordFunc, bool autoCache = true) where T : class, ILogRecord
2323
{
2424
var lastChunk = _chunkWriter.CurrentChunk;
2525
var maxPosition = lastChunk.GlobalDataPosition;
@@ -38,7 +38,7 @@ public T TryReadAt<T>(long position, Func<byte[], T> readRecordFunc) where T : c
3838
}
3939

4040
var localPosition = chunk.ChunkHeader.GetLocalDataPosition(position);
41-
return chunk.TryReadAt(localPosition, readRecordFunc);
41+
return chunk.TryReadAt(localPosition, readRecordFunc, autoCache);
4242
}
4343
public BufferLogRecord TryReadRecordBufferAt(long position)
4444
{

src/EQueue/EQueue.csproj

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,9 @@
4242
<Reference Include="Dapper">
4343
<HintPath>..\packages\Dapper.1.38\lib\net45\Dapper.dll</HintPath>
4444
</Reference>
45-
<Reference Include="ECommon, Version=1.5.2.0, Culture=neutral, processorArchitecture=MSIL">
45+
<Reference Include="ECommon, Version=1.5.3.0, Culture=neutral, processorArchitecture=MSIL">
4646
<SpecificVersion>False</SpecificVersion>
47-
<HintPath>..\packages\ecommon.1.5.2\lib\net45\ECommon.dll</HintPath>
47+
<HintPath>..\packages\ecommon.1.5.3\lib\net45\ECommon.dll</HintPath>
4848
</Reference>
4949
<Reference Include="ECommon.Dapper, Version=1.4.2.0, Culture=neutral, processorArchitecture=MSIL">
5050
<SpecificVersion>False</SpecificVersion>

0 commit comments

Comments
 (0)