diff --git a/blockchain_parser/address.py b/blockchain_parser/address.py index 1dc0855..a0869c0 100644 --- a/blockchain_parser/address.py +++ b/blockchain_parser/address.py @@ -12,6 +12,8 @@ from bitcoin import base58 from bitcoin.bech32 import CBech32Data from .utils import btc_ripemd160, double_sha256 +from .utils_taproot import from_taproot +from binascii import b2a_hex class Address(object): @@ -44,21 +46,30 @@ def from_bech32(cls, hash, segwit_version): """Constructs an Address object from a bech32 hash.""" return cls(hash, None, None, "bech32", segwit_version) + @classmethod + def from_bech32m(cls, hash, segwit_version): + """Constructs an Address object from a bech32m script.""" + return cls(hash, None, None, "bech32m", segwit_version) + @property def hash(self): """Returns the RIPEMD-160 hash corresponding to this address""" if self.public_key is not None and self._hash is None: self._hash = btc_ripemd160(self.public_key) - return self._hash @property def address(self): """Returns the encoded representation of this address. - If SegWit, it's encoded using bech32, otherwise using base58 + If Taproot, it's encoded using bech32m, + if SegWit, it's encoded using bech32, + otherwise using base58 """ if self._address is None: - if self.type != "bech32": + if self.type == "bech32m": + tweaked_pubkey = b2a_hex(self.hash).decode("ascii") + self._address = from_taproot(tweaked_pubkey) + elif self.type != "bech32": version = b'\x00' if self.type == "normal" else b'\x05' checksum = double_sha256(version + self.hash) diff --git a/blockchain_parser/output.py b/blockchain_parser/output.py index 5ef1a58..adf190a 100644 --- a/blockchain_parser/output.py +++ b/blockchain_parser/output.py @@ -77,7 +77,9 @@ def addresses(self): elif self.type == "p2wsh": address = Address.from_bech32(self.script.operations[1], 0) self._addresses.append(address) - + elif self.type == "p2tr": + address = Address.from_bech32m(self.script.operations[1], 1) + self._addresses.append(address) return self._addresses def is_return(self): @@ -104,6 +106,9 @@ def is_p2wpkh(self): def is_p2wsh(self): return self.script.is_p2wsh() + def is_p2tr(self): + return self.script.is_p2tr() + @property def type(self): """Returns the output's script type as a string""" @@ -132,4 +137,7 @@ def type(self): if self.is_p2wsh(): return "p2wsh" + if self.is_p2tr(): + return "p2tr" + return "unknown" diff --git a/blockchain_parser/script.py b/blockchain_parser/script.py index d63ebae..a0a26c1 100644 --- a/blockchain_parser/script.py +++ b/blockchain_parser/script.py @@ -11,6 +11,7 @@ from bitcoin.core.script import * from binascii import b2a_hex +from .utils_taproot import from_taproot def is_public_key(hex_data): @@ -107,6 +108,10 @@ def is_p2wsh(self): def is_p2wpkh(self): return self.script.is_witness_v0_keyhash() + def is_p2tr(self): + return self.operations[0] == 1 \ + and from_taproot(b2a_hex(self.operations[1]).decode("ascii")).startswith("bc1p") + def is_pubkey(self): return len(self.operations) == 2 \ and self.operations[-1] == OP_CHECKSIG \ @@ -142,4 +147,4 @@ def is_unknown(self): return not self.is_pubkeyhash() and not self.is_pubkey() \ and not self.is_p2sh() and not self.is_multisig() \ and not self.is_return() and not self.is_p2wpkh() \ - and not self.is_p2wsh() + and not self.is_p2wsh() and not self.is_p2tr() diff --git a/blockchain_parser/utils_taproot.py b/blockchain_parser/utils_taproot.py new file mode 100644 index 0000000..313f314 --- /dev/null +++ b/blockchain_parser/utils_taproot.py @@ -0,0 +1,141 @@ +# Copyright (C) 2015-2016 The bitcoin-blockchain-parser developers +# +# This file is part of bitcoin-blockchain-parser. +# +# It is subject to the license terms in the LICENSE file found in the top-level +# directory of this distribution. +# +# No part of bitcoin-blockchain-parser, including this file, may be copied, +# modified, propagated, or distributed except according to the terms contained +# in the LICENSE file. +# +# Encoding/Decoding written by Pieter Wuille (2017) +# and adapted by Anton Wahrstätter (2022) +# https://github.com/Bytom/python-bytomlib/blob/master/pybtmsdk/segwit_addr.py + + +from enum import Enum + + +class Encoding(Enum): + """Enumeration type to list the various supported encodings.""" + BECH32M = 2 + + +CHARSET = "qpzry9x8gf2tvdw0s3jn54khce6mua7l" +BECH32M_CONST = 0x2bc830a3 + + +def bech32_polymod(values): + """Internal function that computes the Bech32 checksum.""" + generator = [0x3b6a57b2, 0x26508e6d, 0x1ea119fa, 0x3d4233dd, 0x2a1462b3] + chk = 1 + for value in values: + top = chk >> 25 + chk = (chk & 0x1ffffff) << 5 ^ value + for i in range(5): + chk ^= generator[i] if ((top >> i) & 1) else 0 + return chk + + +def bech32_hrp_expand(hrp): + """Expand the HRP into values for checksum computation.""" + return [ord(x) >> 5 for x in hrp] + [0] + [ord(x) & 31 for x in hrp] + + +def bech32_verify_checksum(hrp, data): + """Verify a checksum given HRP and converted data characters.""" + const = bech32_polymod(bech32_hrp_expand(hrp) + data) + if const == 1: + return Encoding.BECH32 + if const == BECH32M_CONST: + return Encoding.BECH32M + return None + + +def bech32_create_checksum(hrp, data, spec): + """Compute the checksum values given HRP and data.""" + values = bech32_hrp_expand(hrp) + data + const = BECH32M_CONST if spec == Encoding.BECH32M else 1 + polymod = bech32_polymod(values + [0, 0, 0, 0, 0, 0]) ^ const + return [(polymod >> 5 * (5 - i)) & 31 for i in range(6)] + + +def bech32_encode(hrp, data, spec): + """Compute a Bech32 string given HRP and data values.""" + combined = data + bech32_create_checksum(hrp, data, spec) + return hrp + '1' + ''.join([CHARSET[d] for d in combined]) + + +def bech32_decode(bech): + """Validate a Bech32/Bech32m string, and determine HRP and data.""" + if ((any(ord(x) < 33 or ord(x) > 126 for x in bech)) or + (bech.lower() != bech and bech.upper() != bech)): + return (None, None, None) + bech = bech.lower() + pos = bech.rfind('1') + if pos < 1 or pos + 7 > len(bech) or len(bech) > 90: + return (None, None, None) + if not all(x in CHARSET for x in bech[pos+1:]): + return (None, None, None) + hrp = bech[:pos] + data = [CHARSET.find(x) for x in bech[pos+1:]] + spec = bech32_verify_checksum(hrp, data) + if spec is None: + return (None, None, None) + return (hrp, data[:-6], spec) + + +def convertbits(data, frombits, tobits, pad=True): + """General power-of-2 base conversion.""" + acc = 0 + bits = 0 + ret = [] + maxv = (1 << tobits) - 1 + max_acc = (1 << (frombits + tobits - 1)) - 1 + for value in data: + if value < 0 or (value >> frombits): + return None + acc = ((acc << frombits) | value) & max_acc + bits += frombits + while bits >= tobits: + bits -= tobits + ret.append((acc >> bits) & maxv) + if pad: + if bits: + ret.append((acc << (tobits - bits)) & maxv) + elif bits >= frombits or ((acc << (tobits - bits)) & maxv): + return None + return ret + + +def decode(hrp, addr): + """Decode a segwit address.""" + hrpgot, data, spec = bech32_decode(addr) + if hrpgot != hrp: + return (None, None) + decoded = convertbits(data[1:], 5, 8, False) + if decoded is None or len(decoded) < 2 or len(decoded) > 40: + return (None, None) + if data[0] > 16: + return (None, None) + if data[0] == 0 and len(decoded) != 20 and len(decoded) != 32: + return (None, None) + if data[0] != 0 and spec != Encoding.BECH32M: + return (None, None) + return (data[0], decoded) + + +def encode(witprog): + hrp, witver = "bc", 1 + """Encode a segwit address.""" + spec = Encoding.BECH32M + ret = bech32_encode(hrp, [witver] + convertbits(witprog, 8, 5), spec) + if decode(hrp, ret) == (None, None): + return None + return ret + + +def from_taproot(tweaked_pubkey): + tweaked_pubkey = [int(tweaked_pubkey[i:i+2], 16) for i in range(len(tweaked_pubkey), 2)] + return encode(tweaked_pubkey)