-
Notifications
You must be signed in to change notification settings - Fork 0
Add WeightedNonceGenerator #3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
{ | ||
"name" : "io.appform.ranger.discovery.bundle.id.PartitionAwareNonceGeneratorTest.testGenerateWithBenchmark", | ||
"iterations" : 100000, | ||
"threads" : 5, | ||
"totalMillis" : 4118, | ||
"avgTime" : 823.6 | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
{ | ||
"name" : "io.appform.ranger.discovery.bundle.id.WeightedNonceGeneratorTest.testGenerateWithBenchmark", | ||
"iterations" : 100000, | ||
"threads" : 5, | ||
"totalMillis" : 5020, | ||
"avgTime" : 1004.0 | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
package io.appform.ranger.discovery.bundle.id; | ||
|
||
import com.codahale.metrics.MetricRegistry; | ||
import lombok.extern.slf4j.Slf4j; | ||
import lombok.val; | ||
|
||
import java.util.Optional; | ||
import java.util.concurrent.atomic.AtomicInteger; | ||
import java.util.concurrent.atomic.AtomicIntegerArray; | ||
|
||
@Slf4j | ||
class CircularQueue { | ||
private static final String QUEUE_FULL_METRIC_STRING = "idGenerator.queueFull.forPrefix."; | ||
private static final String UNUSED_INDICES_METRIC_STRING = "idGenerator.unusedIds.forPrefix."; | ||
|
||
// private final Meter queueFullMeter; | ||
// private final Meter unusedDataMeter; | ||
|
||
/** List of data for the specific queue */ | ||
private final AtomicIntegerArray queue; | ||
|
||
/** Size of the queue */ | ||
private final int size; | ||
|
||
/** Pointer to track the first usable index. Helps to determine the next usable data. */ | ||
private final AtomicInteger firstIdx = new AtomicInteger(0); | ||
|
||
/** Pointer to track index of last usable index. Helps to determine which index the next data should go in. */ | ||
private final AtomicInteger lastIdx = new AtomicInteger(0); | ||
|
||
|
||
public CircularQueue(int size, final MetricRegistry metricRegistry, final String namespace) { | ||
this.size = size; | ||
this.queue = new AtomicIntegerArray(size); | ||
// this.queueFullMeter = metricRegistry.meter(QUEUE_FULL_METRIC_STRING + namespace); | ||
// this.unusedDataMeter = metricRegistry.meter(UNUSED_INDICES_METRIC_STRING + namespace); | ||
} | ||
|
||
public synchronized void setId(int id) { | ||
// Don't store new data if the queue is already full of unused data. | ||
if (lastIdx.get() >= firstIdx.get() + size - 1) { | ||
// queueFullMeter.mark(); | ||
return; | ||
} | ||
val arrayIdx = lastIdx.get() % size; | ||
queue.set(arrayIdx, id); | ||
lastIdx.getAndIncrement(); | ||
} | ||
|
||
private int getId(int index) { | ||
val arrayIdx = index % size; | ||
return queue.get(arrayIdx); | ||
} | ||
|
||
public synchronized Optional<Integer> getNextId() { | ||
if (firstIdx.get() < lastIdx.get()) { | ||
val id = getId(firstIdx.getAndIncrement()); | ||
return Optional.of(id); | ||
} else { | ||
return Optional.empty(); | ||
} | ||
} | ||
|
||
public void reset() { | ||
val unusedIds = lastIdx.get() - firstIdx.get(); | ||
// unusedDataMeter.mark(unusedIds); | ||
lastIdx.set(0); | ||
firstIdx.set(0); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,10 +1,13 @@ | ||
package io.appform.ranger.discovery.bundle.id; | ||
|
||
import lombok.Value; | ||
import lombok.Builder; | ||
import lombok.Getter; | ||
|
||
@Value | ||
@Getter | ||
@Builder | ||
public class GenerationResult { | ||
IdInfo idInfo; | ||
IdValidationState state; | ||
String domain; | ||
String namespace; | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
package io.appform.ranger.discovery.bundle.id; | ||
|
||
import com.codahale.metrics.MetricRegistry; | ||
|
||
import java.util.Optional; | ||
|
||
public class IdPool { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. CODE_MAINTAINABILTIY: The IdPool class is missing JavaDocs for all methods and constructor, making it difficult to understand the purpose and usage of this class without diving into the implementation. Clear documentation would improve maintainability by helping developers understand the class's purpose and behavior at a glance.
|
||
/** List of IDs for the specific IdPool */ | ||
private final CircularQueue queue; | ||
|
||
public IdPool(int size, final MetricRegistry metricRegistry, final String namespace) { | ||
this.queue = new CircularQueue(size, metricRegistry, namespace); | ||
} | ||
|
||
public void setId(int id) { | ||
queue.setId(id); | ||
} | ||
|
||
public Optional<Integer> getNextId() { | ||
return queue.getNextId(); | ||
} | ||
|
||
public void reset() { | ||
queue.reset(); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
package io.appform.ranger.discovery.bundle.id; | ||
|
||
import lombok.val; | ||
import org.joda.time.DateTime; | ||
|
||
|
||
public class IdUtils { | ||
public static DateTime getDateTimeFromSeconds(long seconds) { | ||
// Convert seconds to milliSeconds | ||
val millis = seconds * 1000L; | ||
// Get DateTime object from milliSeconds | ||
return new DateTime(millis); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,75 @@ | ||
package io.appform.ranger.discovery.bundle.id; | ||
|
||
import com.codahale.metrics.MetricRegistry; | ||
import com.google.common.base.Preconditions; | ||
import lombok.Getter; | ||
import lombok.extern.slf4j.Slf4j; | ||
import lombok.val; | ||
|
||
import java.time.Instant; | ||
import java.util.concurrent.atomic.AtomicInteger; | ||
|
||
import static io.appform.ranger.discovery.bundle.id.Constants.MAX_IDS_PER_SECOND; | ||
|
||
/** | ||
* Tracks generated IDs and pointers for generating next IDs for a partition. | ||
*/ | ||
@Slf4j | ||
public class PartitionIdTracker { | ||
/** Array to store IdPools for each partition */ | ||
private final IdPool[] idPoolList; | ||
|
||
/** Counter to keep track of the number of IDs created */ | ||
private final AtomicInteger nextIdCounter = new AtomicInteger(); | ||
|
||
@Getter | ||
private Instant instant; | ||
|
||
// private final Meter generatedIdCountMeter; | ||
|
||
public PartitionIdTracker(final int partitionSize, | ||
final int idPoolSize, | ||
final Instant instant, | ||
final MetricRegistry metricRegistry, | ||
final String namespace) { | ||
this.instant = instant; | ||
idPoolList = new IdPool[partitionSize]; | ||
for (int i=0; i<partitionSize; i+= 1){ | ||
idPoolList[i] = new IdPool(idPoolSize, metricRegistry, namespace); | ||
} | ||
// this.generatedIdCountMeter = metricRegistry.meter("generatedIdCount.forPrefix." + namespace); | ||
} | ||
|
||
/** Method to get the IdPool for a specific partition */ | ||
public IdPool getPartition(final int partitionId) { | ||
Preconditions.checkArgument(partitionId < idPoolList.length, "Invalid partitionId " + partitionId + " for IdPool of size " + idPoolList.length); | ||
return idPoolList[partitionId]; | ||
} | ||
|
||
public void addId(final int partitionId, final IdInfo idInfo) { | ||
Preconditions.checkArgument(partitionId < idPoolList.length, "Invalid partitionId " + partitionId + " for IdPool of size " + idPoolList.length); | ||
if (instant.getEpochSecond() == idInfo.getTime()) { | ||
idPoolList[partitionId].setId(idInfo.getExponent()); | ||
} | ||
} | ||
|
||
public IdInfo getIdInfo() { | ||
Preconditions.checkArgument(nextIdCounter.get() < MAX_IDS_PER_SECOND, "ID Generation Per Second Limit Reached."); | ||
val timeInSeconds = instant.getEpochSecond(); | ||
return new IdInfo(nextIdCounter.getAndIncrement(), timeInSeconds); | ||
} | ||
|
||
public synchronized void reset(final Instant instant) { | ||
// Do not reset if the time (in seconds) hasn't changed | ||
if (this.instant.getEpochSecond() == instant.getEpochSecond()) { | ||
return; | ||
} | ||
// Reset all ID Pools because they contain IDs for an expired timestamp | ||
for (val idPool: idPoolList) { | ||
idPool.reset(); | ||
} | ||
// generatedIdCountMeter.mark(nextIdCounter.get()); | ||
nextIdCounter.set(0); | ||
this.instant = instant; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
package io.appform.ranger.discovery.bundle.id.config; | ||
|
||
import lombok.AllArgsConstructor; | ||
import lombok.Builder; | ||
import lombok.Data; | ||
import lombok.NoArgsConstructor; | ||
|
||
import javax.validation.constraints.Min; | ||
import javax.validation.constraints.NotNull; | ||
|
||
@Data | ||
@Builder | ||
@AllArgsConstructor | ||
@NoArgsConstructor | ||
public class DefaultNamespaceConfig { | ||
/** Size of pre-generated id buffer. Each partition will have separate IdPool, each of size equal to this value. */ | ||
@NotNull | ||
@Min(2) | ||
private Integer idPoolSizePerPartition; | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,76 @@ | ||
package io.appform.ranger.discovery.bundle.id.config; | ||
|
||
import com.fasterxml.jackson.annotation.JsonIgnore; | ||
import io.appform.ranger.discovery.bundle.id.Constants; | ||
import io.dropwizard.validation.ValidationMethod; | ||
import lombok.AllArgsConstructor; | ||
import lombok.Builder; | ||
import lombok.Data; | ||
import lombok.NoArgsConstructor; | ||
|
||
import javax.validation.Valid; | ||
import javax.validation.constraints.Max; | ||
import javax.validation.constraints.Min; | ||
import javax.validation.constraints.NotNull; | ||
import java.util.ArrayList; | ||
import java.util.Collections; | ||
import java.util.Comparator; | ||
import java.util.List; | ||
import java.util.Set; | ||
import java.util.stream.Collectors; | ||
|
||
|
||
@Data | ||
@Builder | ||
@AllArgsConstructor | ||
@NoArgsConstructor | ||
public class IdGeneratorConfig { | ||
|
||
/** Optional for non-weighted scenarios */ | ||
@Valid | ||
private WeightedIdConfig weightedIdConfig; | ||
|
||
@NotNull | ||
@Valid | ||
private DefaultNamespaceConfig defaultNamespaceConfig; | ||
|
||
@Builder.Default | ||
@Valid | ||
private Set<NamespaceConfig> namespaceConfig = Collections.emptySet(); | ||
|
||
@NotNull | ||
@Min(1) | ||
private int partitionCount; | ||
|
||
/** Buffer time to pre-generate IDs for */ | ||
@Min(1) | ||
@Max(300) | ||
@Builder.Default | ||
private int dataStorageLimitInSeconds = Constants.DEFAULT_DATA_STORAGE_TIME_LIMIT_IN_SECONDS; | ||
|
||
/** Retry limit for selecting a valid partition. Not required for unconstrained scenarios */ | ||
@Min(1) | ||
@Builder.Default | ||
private int partitionRetryCount = Constants.DEFAULT_PARTITION_RETRY_COUNT; | ||
|
||
@ValidationMethod(message = "Namespaces should be unique") | ||
@JsonIgnore | ||
public boolean areNamespacesUnique() { | ||
Set<String> namespaces = namespaceConfig.stream() | ||
.map(NamespaceConfig::getNamespace) | ||
.collect(Collectors.toSet()); | ||
return namespaceConfig.size() == namespaces.size(); | ||
} | ||
|
||
@ValidationMethod(message = "Invalid Partition Range") | ||
@JsonIgnore | ||
public boolean isPartitionCountValid() { | ||
if (weightedIdConfig != null) { | ||
List<WeightedPartition> sortedPartitions = new ArrayList<>(weightedIdConfig.getPartitions()); | ||
sortedPartitions.sort(Comparator.comparingInt(k -> k.getPartitionRange().getStart())); | ||
return sortedPartitions.get(sortedPartitions.size() - 1).getPartitionRange().getEnd() - sortedPartitions.get(0).getPartitionRange().getStart() + 1 == partitionCount; | ||
} | ||
return true; | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
package io.appform.ranger.discovery.bundle.id.config; | ||
|
||
import lombok.AllArgsConstructor; | ||
import lombok.Builder; | ||
import lombok.Data; | ||
import lombok.NoArgsConstructor; | ||
|
||
import javax.validation.constraints.Min; | ||
import javax.validation.constraints.NotNull; | ||
|
||
@Data | ||
@Builder | ||
@AllArgsConstructor | ||
@NoArgsConstructor | ||
public class NamespaceConfig { | ||
@NotNull | ||
private String namespace; | ||
|
||
/** Size of pre-generated id buffer. Value from DefaultNamespaceConfig will be used if this is null */ | ||
@Min(2) | ||
private Integer idPoolSizePerBucket; | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
package io.appform.ranger.discovery.bundle.id.config; | ||
|
||
import com.fasterxml.jackson.annotation.JsonIgnore; | ||
import io.dropwizard.validation.ValidationMethod; | ||
import lombok.AllArgsConstructor; | ||
import lombok.Builder; | ||
import lombok.Data; | ||
import lombok.NoArgsConstructor; | ||
|
||
import javax.validation.constraints.Min; | ||
import javax.validation.constraints.NotNull; | ||
|
||
@Data | ||
@Builder | ||
@NoArgsConstructor | ||
@AllArgsConstructor | ||
public class PartitionRange { | ||
@NotNull | ||
@Min(0) | ||
private int start; | ||
|
||
/* end partition is inclusive in range */ | ||
@NotNull | ||
@Min(0) | ||
private int end; | ||
|
||
@ValidationMethod(message = "Partition Range should be non-decreasing") | ||
@JsonIgnore | ||
public boolean isRangeValid() { | ||
return start <= end; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CODE_MAINTAINABILTIY: The CircularQueue class contains multiple commented out lines of metric instrumentation code. This creates confusion about whether these metrics should be active or not. Either these metrics should be properly implemented or the commented code should be removed entirely to avoid maintainability issues.