Skip to content

Add WeightedNonceGenerator #3

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"name" : "io.appform.ranger.discovery.bundle.id.PartitionAwareNonceGeneratorTest.testGenerateWithBenchmark",
"iterations" : 100000,
"threads" : 5,
"totalMillis" : 4118,
"avgTime" : 823.6
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"name" : "io.appform.ranger.discovery.bundle.id.WeightedNonceGeneratorTest.testGenerateWithBenchmark",
"iterations" : 100000,
"threads" : 5,
"totalMillis" : 5020,
"avgTime" : 1004.0
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package io.appform.ranger.discovery.bundle.id;

import com.codahale.metrics.MetricRegistry;
import lombok.extern.slf4j.Slf4j;
import lombok.val;

import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerArray;

@Slf4j
class CircularQueue {
private static final String QUEUE_FULL_METRIC_STRING = "idGenerator.queueFull.forPrefix.";
private static final String UNUSED_INDICES_METRIC_STRING = "idGenerator.unusedIds.forPrefix.";

// private final Meter queueFullMeter;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CODE_MAINTAINABILTIY: The CircularQueue class contains multiple commented out lines of metric instrumentation code. This creates confusion about whether these metrics should be active or not. Either these metrics should be properly implemented or the commented code should be removed entirely to avoid maintainability issues.

//        this.queueFullMeter = metricRegistry.meter(QUEUE_FULL_METRIC_STRING + namespace);
//        this.unusedDataMeter = metricRegistry.meter(UNUSED_INDICES_METRIC_STRING + namespace);

// Uncomment and use these meters or remove the commented code and constants

// private final Meter unusedDataMeter;

/** List of data for the specific queue */
private final AtomicIntegerArray queue;

/** Size of the queue */
private final int size;

/** Pointer to track the first usable index. Helps to determine the next usable data. */
private final AtomicInteger firstIdx = new AtomicInteger(0);

/** Pointer to track index of last usable index. Helps to determine which index the next data should go in. */
private final AtomicInteger lastIdx = new AtomicInteger(0);


public CircularQueue(int size, final MetricRegistry metricRegistry, final String namespace) {
this.size = size;
this.queue = new AtomicIntegerArray(size);
// this.queueFullMeter = metricRegistry.meter(QUEUE_FULL_METRIC_STRING + namespace);
// this.unusedDataMeter = metricRegistry.meter(UNUSED_INDICES_METRIC_STRING + namespace);
}

public synchronized void setId(int id) {
// Don't store new data if the queue is already full of unused data.
if (lastIdx.get() >= firstIdx.get() + size - 1) {
// queueFullMeter.mark();
return;
}
val arrayIdx = lastIdx.get() % size;
queue.set(arrayIdx, id);
lastIdx.getAndIncrement();
}

private int getId(int index) {
val arrayIdx = index % size;
return queue.get(arrayIdx);
}

public synchronized Optional<Integer> getNextId() {
if (firstIdx.get() < lastIdx.get()) {
val id = getId(firstIdx.getAndIncrement());
return Optional.of(id);
} else {
return Optional.empty();
}
}

public void reset() {
val unusedIds = lastIdx.get() - firstIdx.get();
// unusedDataMeter.mark(unusedIds);
lastIdx.set(0);
firstIdx.set(0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,7 @@
public class Constants {
public static final int MAX_ID_PER_MS = 1000;
public static final int MAX_NUM_NODES = 10000;
public static final int MAX_IDS_PER_SECOND = 1_000_000;
public static final int DEFAULT_DATA_STORAGE_TIME_LIMIT_IN_SECONDS = 60;
public static final int DEFAULT_PARTITION_RETRY_COUNT = 100;
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package io.appform.ranger.discovery.bundle.id;

import lombok.Value;
import lombok.Builder;
import lombok.Getter;

@Value
@Getter
@Builder
public class GenerationResult {
IdInfo idInfo;
IdValidationState state;
String domain;
String namespace;
}
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ public static Optional<Id> generateWithConstraints(
String prefix,
final List<IdValidationConstraint> inConstraints,
boolean skipGlobal) {
return generate(IdGenerationRequest.builder()
return generate(IdGenerationRequest.<IdValidationConstraint>builder()
.prefix(prefix)
.constraints(inConstraints)
.skipGlobal(skipGlobal)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package io.appform.ranger.discovery.bundle.id;

import com.codahale.metrics.MetricRegistry;

import java.util.Optional;

public class IdPool {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CODE_MAINTAINABILTIY: The IdPool class is missing JavaDocs for all methods and constructor, making it difficult to understand the purpose and usage of this class without diving into the implementation. Clear documentation would improve maintainability by helping developers understand the class's purpose and behavior at a glance.

public class IdPool {
    /** List of IDs for the specific IdPool */
    private final CircularQueue queue;

    /**
     * Creates an IdPool with the specified size and metrics registry.
     * 
     * @param size Size of the circular queue to maintain IDs
     * @param metricRegistry Metrics registry for monitoring queue behavior
     * @param namespace Namespace used for metrics reporting
     */
    public IdPool(int size, final MetricRegistry metricRegistry, final String namespace) {
        this.queue = new CircularQueue(size, metricRegistry, namespace);
    }

    /**
     * Stores a new ID in the pool.
     * 
     * @param id The ID to store
     */
    public void setId(int id) {
        queue.setId(id);
    }

    /**
     * Retrieves the next available ID from the pool.
     * 
     * @return Optional containing the next ID if available, empty Optional otherwise
     */
    public Optional<Integer> getNextId() {
        return queue.getNextId();
    }

    /**
     * Resets the pool state, clearing all stored IDs.
     */
    public void reset() {
        queue.reset();
    }
}

/** List of IDs for the specific IdPool */
private final CircularQueue queue;

public IdPool(int size, final MetricRegistry metricRegistry, final String namespace) {
this.queue = new CircularQueue(size, metricRegistry, namespace);
}

public void setId(int id) {
queue.setId(id);
}

public Optional<Integer> getNextId() {
return queue.getNextId();
}

public void reset() {
queue.reset();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package io.appform.ranger.discovery.bundle.id;

import lombok.val;
import org.joda.time.DateTime;


public class IdUtils {
public static DateTime getDateTimeFromSeconds(long seconds) {
// Convert seconds to milliSeconds
val millis = seconds * 1000L;
// Get DateTime object from milliSeconds
return new DateTime(millis);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package io.appform.ranger.discovery.bundle.id;

import com.codahale.metrics.MetricRegistry;
import com.google.common.base.Preconditions;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import lombok.val;

import java.time.Instant;
import java.util.concurrent.atomic.AtomicInteger;

import static io.appform.ranger.discovery.bundle.id.Constants.MAX_IDS_PER_SECOND;

/**
* Tracks generated IDs and pointers for generating next IDs for a partition.
*/
@Slf4j
public class PartitionIdTracker {
/** Array to store IdPools for each partition */
private final IdPool[] idPoolList;

/** Counter to keep track of the number of IDs created */
private final AtomicInteger nextIdCounter = new AtomicInteger();

@Getter
private Instant instant;

// private final Meter generatedIdCountMeter;

public PartitionIdTracker(final int partitionSize,
final int idPoolSize,
final Instant instant,
final MetricRegistry metricRegistry,
final String namespace) {
this.instant = instant;
idPoolList = new IdPool[partitionSize];
for (int i=0; i<partitionSize; i+= 1){
idPoolList[i] = new IdPool(idPoolSize, metricRegistry, namespace);
}
// this.generatedIdCountMeter = metricRegistry.meter("generatedIdCount.forPrefix." + namespace);
}

/** Method to get the IdPool for a specific partition */
public IdPool getPartition(final int partitionId) {
Preconditions.checkArgument(partitionId < idPoolList.length, "Invalid partitionId " + partitionId + " for IdPool of size " + idPoolList.length);
return idPoolList[partitionId];
}

public void addId(final int partitionId, final IdInfo idInfo) {
Preconditions.checkArgument(partitionId < idPoolList.length, "Invalid partitionId " + partitionId + " for IdPool of size " + idPoolList.length);
if (instant.getEpochSecond() == idInfo.getTime()) {
idPoolList[partitionId].setId(idInfo.getExponent());
}
}

public IdInfo getIdInfo() {
Preconditions.checkArgument(nextIdCounter.get() < MAX_IDS_PER_SECOND, "ID Generation Per Second Limit Reached.");
val timeInSeconds = instant.getEpochSecond();
return new IdInfo(nextIdCounter.getAndIncrement(), timeInSeconds);
}

public synchronized void reset(final Instant instant) {
// Do not reset if the time (in seconds) hasn't changed
if (this.instant.getEpochSecond() == instant.getEpochSecond()) {
return;
}
// Reset all ID Pools because they contain IDs for an expired timestamp
for (val idPool: idPoolList) {
idPool.reset();
}
// generatedIdCountMeter.mark(nextIdCounter.get());
nextIdCounter.set(0);
this.instant = instant;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package io.appform.ranger.discovery.bundle.id.config;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class DefaultNamespaceConfig {
/** Size of pre-generated id buffer. Each partition will have separate IdPool, each of size equal to this value. */
@NotNull
@Min(2)
private Integer idPoolSizePerPartition;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package io.appform.ranger.discovery.bundle.id.config;

import com.fasterxml.jackson.annotation.JsonIgnore;
import io.appform.ranger.discovery.bundle.id.Constants;
import io.dropwizard.validation.ValidationMethod;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import javax.validation.Valid;
import javax.validation.constraints.Max;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;


@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class IdGeneratorConfig {

/** Optional for non-weighted scenarios */
@Valid
private WeightedIdConfig weightedIdConfig;

@NotNull
@Valid
private DefaultNamespaceConfig defaultNamespaceConfig;

@Builder.Default
@Valid
private Set<NamespaceConfig> namespaceConfig = Collections.emptySet();

@NotNull
@Min(1)
private int partitionCount;

/** Buffer time to pre-generate IDs for */
@Min(1)
@Max(300)
@Builder.Default
private int dataStorageLimitInSeconds = Constants.DEFAULT_DATA_STORAGE_TIME_LIMIT_IN_SECONDS;

/** Retry limit for selecting a valid partition. Not required for unconstrained scenarios */
@Min(1)
@Builder.Default
private int partitionRetryCount = Constants.DEFAULT_PARTITION_RETRY_COUNT;

@ValidationMethod(message = "Namespaces should be unique")
@JsonIgnore
public boolean areNamespacesUnique() {
Set<String> namespaces = namespaceConfig.stream()
.map(NamespaceConfig::getNamespace)
.collect(Collectors.toSet());
return namespaceConfig.size() == namespaces.size();
}

@ValidationMethod(message = "Invalid Partition Range")
@JsonIgnore
public boolean isPartitionCountValid() {
if (weightedIdConfig != null) {
List<WeightedPartition> sortedPartitions = new ArrayList<>(weightedIdConfig.getPartitions());
sortedPartitions.sort(Comparator.comparingInt(k -> k.getPartitionRange().getStart()));
return sortedPartitions.get(sortedPartitions.size() - 1).getPartitionRange().getEnd() - sortedPartitions.get(0).getPartitionRange().getStart() + 1 == partitionCount;
}
return true;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package io.appform.ranger.discovery.bundle.id.config;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class NamespaceConfig {
@NotNull
private String namespace;

/** Size of pre-generated id buffer. Value from DefaultNamespaceConfig will be used if this is null */
@Min(2)
private Integer idPoolSizePerBucket;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package io.appform.ranger.discovery.bundle.id.config;

import com.fasterxml.jackson.annotation.JsonIgnore;
import io.dropwizard.validation.ValidationMethod;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class PartitionRange {
@NotNull
@Min(0)
private int start;

/* end partition is inclusive in range */
@NotNull
@Min(0)
private int end;

@ValidationMethod(message = "Partition Range should be non-decreasing")
@JsonIgnore
public boolean isRangeValid() {
return start <= end;
}
}
Loading