Skip to content

Commit 9a66571

Browse files
authored
Merge pull request #2084 from opentensor/multiple-pulse-txs-per-block
Multiple Pulse TXs per block
2 parents 31578b0 + da9a2dc commit 9a66571

File tree

2 files changed

+90
-58
lines changed

2 files changed

+90
-58
lines changed

pallets/drand/src/lib.rs

Lines changed: 82 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -307,18 +307,21 @@ pub mod pallet {
307307
signature,
308308
&payload.block_number,
309309
&payload.public,
310+
None,
310311
)
311312
}
312313
Call::write_pulse {
313314
pulses_payload: payload,
314315
signature,
315316
} => {
316317
let signature = signature.as_ref().ok_or(InvalidTransaction::BadSigner)?;
318+
let rounds: Vec<RoundNumber> = payload.pulses.iter().map(|p| p.round).collect();
317319
Self::validate_signature_and_parameters(
318320
payload,
319321
signature,
320322
&payload.block_number,
321323
&payload.public,
324+
Some(&rounds),
322325
)
323326
}
324327
_ => InvalidTransaction::Call.into(),
@@ -480,32 +483,34 @@ impl<T: Config> Pallet<T> {
480483
pulses.push(pulse);
481484
}
482485

483-
let signer = Signer::<T, T::AuthorityId>::all_accounts();
484-
485-
let results = signer.send_unsigned_transaction(
486-
|account| PulsesPayload {
487-
block_number,
488-
pulses: pulses.clone(),
489-
public: account.public.clone(),
490-
},
491-
|pulses_payload, signature| Call::write_pulse {
492-
pulses_payload,
493-
signature: Some(signature),
494-
},
495-
);
496-
497-
for (acc, res) in &results {
498-
match res {
499-
Ok(()) => log::debug!(
500-
"Drand: [{:?}] Submitted new pulses up to round: {:?}",
501-
acc.id,
502-
last_stored_round.saturating_add(rounds_to_fetch)
503-
),
504-
Err(e) => log::error!(
505-
"Drand: [{:?}] Failed to submit transaction: {:?}",
506-
acc.id,
507-
e
508-
),
486+
let signer = Signer::<T, T::AuthorityId>::any_account();
487+
488+
// Submit one tx per pulse, ascending rounds.
489+
for pulse in pulses.into_iter() {
490+
let round = pulse.round;
491+
492+
if let Some((acc, res)) = signer.send_unsigned_transaction(
493+
|account| PulsesPayload {
494+
block_number,
495+
pulses: vec![pulse.clone()],
496+
public: account.public.clone(),
497+
},
498+
|pulses_payload, signature| Call::write_pulse {
499+
pulses_payload,
500+
signature: Some(signature),
501+
},
502+
) {
503+
match res {
504+
Ok(()) => log::debug!("Drand: [{:?}] submitted round {:?}", acc.id, round),
505+
Err(e) => log::debug!(
506+
"Drand: [{:?}] failed to submit round {:?}: {:?}",
507+
acc.id,
508+
round,
509+
e
510+
),
511+
}
512+
} else {
513+
log::debug!("Drand: No local account available to submit round {round:?}");
509514
}
510515
}
511516
}
@@ -631,54 +636,74 @@ impl<T: Config> Pallet<T> {
631636
signature: &T::Signature,
632637
block_number: &BlockNumberFor<T>,
633638
public: &T::Public,
639+
rounds: Option<&[RoundNumber]>,
634640
) -> TransactionValidity {
635641
let signature_valid =
636642
SignedPayload::<T>::verify::<T::AuthorityId>(payload, signature.clone());
637643
if !signature_valid {
638644
return InvalidTransaction::BadProof.into();
639645
}
640-
Self::validate_transaction_parameters(block_number, public)
646+
Self::validate_transaction_parameters(block_number, public, rounds)
641647
}
642648

643649
fn validate_transaction_parameters(
644650
block_number: &BlockNumberFor<T>,
645651
public: &T::Public,
652+
rounds: Option<&[RoundNumber]>,
646653
) -> TransactionValidity {
647-
// Now let's check if the transaction has any chance to succeed.
648654
let next_unsigned_at = NextUnsignedAt::<T>::get();
649-
if &next_unsigned_at > block_number {
650-
return InvalidTransaction::Stale.into();
651-
}
652-
// Let's make sure to reject transactions from the future.
653655
let current_block = frame_system::Pallet::<T>::block_number();
654-
if &current_block < block_number {
656+
657+
if current_block < *block_number {
655658
return InvalidTransaction::Future.into();
656659
}
657660

658-
let provides_tag = (next_unsigned_at, public.encode()).using_encoded(blake2_256);
659-
660-
ValidTransaction::with_tag_prefix("DrandOffchainWorker")
661-
// We set the priority to the value stored at `UnsignedPriority`.
662-
.priority(T::UnsignedPriority::get())
663-
// This transaction does not require anything else to go before into the pool.
664-
// In theory we could require `previous_unsigned_at` transaction to go first,
665-
// but it's not necessary in our case.
666-
// We set the `provides` tag to be the same as `next_unsigned_at`. This makes
667-
// sure only one transaction produced after `next_unsigned_at` will ever
668-
// get to the transaction pool and will end up in the block.
669-
// We can still have multiple transactions compete for the same "spot",
670-
// and the one with higher priority will replace other one in the pool.
671-
.and_provides(provides_tag)
672-
// The transaction is only valid for next block. After that it's
673-
// going to be revalidated by the pool.
674-
.longevity(1)
675-
// It's fine to propagate that transaction to other peers, which means it can be
676-
// created even by nodes that don't produce blocks.
677-
// Note that sometimes it's better to keep it for yourself (if you are the block
678-
// producer), since for instance in some schemes others may copy your solution and
679-
// claim a reward.
680-
.propagate(true)
681-
.build()
661+
match rounds {
662+
Some(rs) => {
663+
let r_opt = rs.first().copied();
664+
let has_second = rs.get(1).is_some();
665+
let r = match (r_opt, has_second) {
666+
(Some(round), false) => round,
667+
_ => return InvalidTransaction::Call.into(),
668+
};
669+
670+
// Allow multiple unsigned txs in the same block even after the first updates the gate.
671+
if next_unsigned_at > current_block {
672+
return InvalidTransaction::Stale.into();
673+
}
674+
675+
// Drop stale rounds at mempool time to avoid re-including last block's rounds.
676+
let last = LastStoredRound::<T>::get();
677+
if r <= last {
678+
return InvalidTransaction::Stale.into();
679+
}
680+
681+
// Priority favors lower rounds first.
682+
let priority =
683+
T::UnsignedPriority::get().saturating_add(u64::MAX.saturating_sub(r));
684+
685+
ValidTransaction::with_tag_prefix("DrandOffchainWorker")
686+
.priority(priority)
687+
.and_provides((b"drand", r).using_encoded(blake2_256))
688+
.longevity(3)
689+
.propagate(false)
690+
.build()
691+
}
692+
693+
None => {
694+
if next_unsigned_at > *block_number {
695+
return InvalidTransaction::Stale.into();
696+
}
697+
698+
let provides_tag = (next_unsigned_at, public.encode()).using_encoded(blake2_256);
699+
ValidTransaction::with_tag_prefix("DrandOffchainWorker")
700+
.priority(T::UnsignedPriority::get())
701+
.and_provides(provides_tag)
702+
.longevity(1)
703+
.propagate(true)
704+
.build()
705+
}
706+
}
682707
}
683708

684709
fn prune_old_pulses(last_stored_round: RoundNumber) {

pallets/drand/src/tests.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -275,9 +275,16 @@ fn test_validate_unsigned_write_pulse() {
275275
let block_number = 100_000_000;
276276
let alice = sp_keyring::Sr25519Keyring::Alice;
277277
System::set_block_number(block_number);
278+
279+
let pulse = Pulse {
280+
round: 1,
281+
randomness: frame_support::BoundedVec::truncate_from(vec![0u8; 32]),
282+
signature: frame_support::BoundedVec::truncate_from(vec![1u8; 96]),
283+
};
284+
278285
let pulses_payload = PulsesPayload {
279286
block_number,
280-
pulses: vec![],
287+
pulses: vec![pulse],
281288
public: alice.public(),
282289
};
283290
let signature = alice.sign(&pulses_payload.encode());

0 commit comments

Comments
 (0)