Chapter 3: In-Depth Analysis of Bitcoin

Haiyue
33min

Chapter 3: In-Depth Analysis of Bitcoin

Learning Objectives
  • Understand the background and design philosophy of Bitcoin’s creation
  • Master Bitcoin’s technical architecture
  • Understand the mining mechanism and reward system
  • Learn Bitcoin’s transaction verification process

Background of Bitcoin’s Creation

Historical Background and Motivation

Bitcoin was born during the 2008 global financial crisis. Satoshi Nakamoto published the whitepaper “Bitcoin: A Peer-to-Peer Electronic Cash System,” proposing a decentralized electronic cash system.

Design Intent

Bitcoin’s core design goals:

  • Trustless: No reliance on third-party financial institutions
  • Decentralization: No single point of control
  • Immutability: Security guaranteed by cryptography
  • Limited Supply: Maximum cap of 21 million coins
  • Global Circulation: 24/7 borderless transactions
🔄 正在渲染 Mermaid 图表...

Bitcoin Technical Architecture

UTXO Model

Bitcoin uses the UTXO (Unspent Transaction Output) model, which differs from traditional account models:

from typing import Dict, List, Optional, Tuple
import hashlib
import json
import time
from dataclasses import dataclass

@dataclass
class UTXO:
    """Unspent Transaction Output"""
    txid: str          # Transaction ID
    output_index: int  # Output index
    amount: float      # Amount
    script_pubkey: str # Locking script
    address: str       # Receiving address

class UTXOSet:
    """UTXO set management"""

    def __init__(self):
        self.utxos: Dict[str, UTXO] = {}  # key: txid:index

    def add_utxo(self, utxo: UTXO):
        """Add UTXO"""
        key = f"{utxo.txid}:{utxo.output_index}"
        self.utxos[key] = utxo

    def spend_utxo(self, txid: str, output_index: int) -> Optional[UTXO]:
        """Spend UTXO"""
        key = f"{txid}:{output_index}"
        return self.utxos.pop(key, None)

    def get_balance(self, address: str) -> float:
        """Get address balance"""
        balance = 0
        for utxo in self.utxos.values():
            if utxo.address == address:
                balance += utxo.amount
        return balance

    def get_utxos_for_address(self, address: str) -> List[UTXO]:
        """Get all UTXOs for an address"""
        return [utxo for utxo in self.utxos.values() if utxo.address == address]

class BitcoinTransaction:
    """Bitcoin transaction"""

    def __init__(self):
        self.version = 1
        self.inputs: List[Dict] = []
        self.outputs: List[Dict] = []
        self.locktime = 0
        self.txid: Optional[str] = None

    def add_input(self, prev_txid: str, output_index: int, signature: str):
        """Add transaction input"""
        tx_input = {
            "prev_txid": prev_txid,
            "output_index": output_index,
            "signature_script": signature,
            "sequence": 0xffffffff
        }
        self.inputs.append(tx_input)

    def add_output(self, amount: float, recipient_address: str):
        """Add transaction output"""
        tx_output = {
            "amount": amount,
            "script_pubkey": f"OP_DUP OP_HASH160 {recipient_address} OP_EQUALVERIFY OP_CHECKSIG"
        }
        self.outputs.append(tx_output)

    def calculate_txid(self) -> str:
        """Calculate transaction ID"""
        tx_data = {
            "version": self.version,
            "inputs": self.inputs,
            "outputs": self.outputs,
            "locktime": self.locktime
        }
        tx_string = json.dumps(tx_data, sort_keys=True)
        self.txid = hashlib.sha256(tx_string.encode()).hexdigest()
        return self.txid

    def calculate_fee(self, utxo_set: UTXOSet) -> float:
        """Calculate transaction fee"""
        input_total = 0

        # Calculate total input
        for tx_input in self.inputs:
            key = f"{tx_input['prev_txid']}:{tx_input['output_index']}"
            if key in utxo_set.utxos:
                input_total += utxo_set.utxos[key].amount

        # Calculate total output
        output_total = sum(output["amount"] for output in self.outputs)

        return input_total - output_total

# UTXO model demonstration
print("=== UTXO Model Demonstration ===")

# Initialize UTXO set
utxo_set = UTXOSet()

# Create genesis transaction (mining reward)
genesis_utxo = UTXO(
    txid="genesis_tx",
    output_index=0,
    amount=50.0,
    script_pubkey="OP_DUP OP_HASH160 alice_pubkey OP_EQUALVERIFY OP_CHECKSIG",
    address="alice_address"
)
utxo_set.add_utxo(genesis_utxo)

print(f"Alice's initial balance: {utxo_set.get_balance('alice_address')} BTC")

# Alice transfers 25 BTC to Bob
tx1 = BitcoinTransaction()
tx1.add_input("genesis_tx", 0, "alice_signature")
tx1.add_output(25.0, "bob_address")      # To Bob
tx1.add_output(24.9, "alice_address")    # Change to Alice
tx1.calculate_txid()

print(f"\nTransaction 1 ID: {tx1.txid}")
print(f"Transaction fee: {tx1.calculate_fee(utxo_set)} BTC")

# Update UTXO set
utxo_set.spend_utxo("genesis_tx", 0)  # Spend original UTXO
utxo_set.add_utxo(UTXO(tx1.txid, 0, 25.0, "", "bob_address"))      # Bob's new UTXO
utxo_set.add_utxo(UTXO(tx1.txid, 1, 24.9, "", "alice_address"))    # Alice's change UTXO

print(f"\nBalances after transaction:")
print(f"Alice: {utxo_set.get_balance('alice_address')} BTC")
print(f"Bob: {utxo_set.get_balance('bob_address')} BTC")

Script System

Bitcoin uses a stack-based scripting language to define transaction conditions:

from typing import List, Union
import hashlib
import time

class BitcoinScript:
    """Bitcoin script interpreter"""

    def __init__(self):
        self.stack: List[Union[str, bytes, int]] = []
        self.alt_stack: List[Union[str, bytes, int]] = []

    def execute(self, script: List[str]) -> bool:
        """Execute script"""
        try:
            for operation in script:
                if operation.startswith("OP_"):
                    self._execute_opcode(operation)
                else:
                    # Push regular data onto stack
                    self.stack.append(operation)

            # Script executed successfully and stack top is True
            return len(self.stack) > 0 and self._is_true(self.stack[-1])

        except Exception as e:
            print(f"Script execution error: {e}")
            return False

    def _execute_opcode(self, opcode: str):
        """Execute opcode"""
        if opcode == "OP_DUP":
            # Duplicate stack top element
            if self.stack:
                self.stack.append(self.stack[-1])

        elif opcode == "OP_HASH160":
            # Execute RIPEMD160(SHA256(x)) on stack top
            if self.stack:
                data = str(self.stack.pop())
                sha256_hash = hashlib.sha256(data.encode()).digest()
                ripemd160_hash = hashlib.new('ripemd160', sha256_hash).hexdigest()
                self.stack.append(ripemd160_hash)

        elif opcode == "OP_EQUALVERIFY":
            # Compare if top two elements are equal, terminate if not
            if len(self.stack) >= 2:
                a = self.stack.pop()
                b = self.stack.pop()
                if a != b:
                    raise Exception("OP_EQUALVERIFY failed")
                self.stack.append(1)  # True

        elif opcode == "OP_CHECKSIG":
            # Verify digital signature (simplified implementation)
            if len(self.stack) >= 2:
                pubkey = self.stack.pop()
                signature = self.stack.pop()
                # Simplified signature verification
                is_valid = self._verify_signature(signature, pubkey)
                self.stack.append(1 if is_valid else 0)

        elif opcode == "OP_CHECKMULTISIG":
            # Multi-signature verification
            if not self.stack:
                raise Exception("OP_CHECKMULTISIG: empty stack")

            num_pubkeys = int(self.stack.pop())
            pubkeys = [self.stack.pop() for _ in range(num_pubkeys)]
            num_sigs = int(self.stack.pop())
            signatures = [self.stack.pop() for _ in range(num_sigs)]

            # Simplified verification: at least one valid signature
            valid_sigs = 0
            for sig in signatures:
                for pubkey in pubkeys:
                    if self._verify_signature(sig, pubkey):
                        valid_sigs += 1
                        break

            self.stack.append(1 if valid_sigs >= num_sigs else 0)

        elif opcode == "OP_RETURN":
            # OP_RETURN - makes transaction invalid (used for data storage)
            raise Exception("OP_RETURN encountered")

    def _verify_signature(self, signature: str, pubkey: str) -> bool:
        """Verify digital signature (simplified implementation)"""
        # Actual implementation requires ECDSA verification
        return len(signature) > 10 and len(pubkey) > 10

    def _is_true(self, value: Union[str, bytes, int]) -> bool:
        """Check if value is true"""
        if isinstance(value, int):
            return value != 0
        if isinstance(value, str):
            return value != "0" and value != ""
        return len(value) > 0

# Script example demonstration
print("\n=== Bitcoin Script Demonstration ===")

# 1. P2PKH (Pay to Public Key Hash) script
print("\n1. P2PKH script verification:")
script_interpreter = BitcoinScript()

# Unlocking script (scriptSig)
unlock_script = ["valid_signature", "alice_pubkey"]

# Locking script (scriptPubKey)
lock_script = ["OP_DUP", "OP_HASH160", "alice_pubkey_hash", "OP_EQUALVERIFY", "OP_CHECKSIG"]

# Execute complete script
full_script = unlock_script + lock_script
result = script_interpreter.execute(full_script)
print(f"P2PKH script execution result: {result}")

# 2. Multi-signature script
print("\n2. Multi-signature script (2-of-3):")
multisig_interpreter = BitcoinScript()

# 2-of-3 multisig script
multisig_script = [
    "0",  # OP_CHECKMULTISIG bug fix
    "signature1", "signature2",  # 2 signatures
    "2",  # Number of required signatures
    "pubkey1", "pubkey2", "pubkey3",  # 3 public keys
    "3",  # Total number of public keys
    "OP_CHECKMULTISIG"
]

result = multisig_interpreter.execute(multisig_script)
print(f"2-of-3 multisig script execution result: {result}")

# 3. Timelock script demonstration
print("\n3. Timelock script concept:")
timelock_script = [
    "1640995200",  # Timestamp (2022-01-01)
    "OP_CHECKLOCKTIMEVERIFY",  # Check timelock
    "OP_DROP",
    "alice_signature",
    "alice_pubkey",
    "OP_CHECKSIG"
]
print("Timelock script structure:", timelock_script)
print("Description: UTXO can only be spent after specified time")

Mining Mechanism Explained

Proof of Work Algorithm

Bitcoin uses double SHA-256 hashing as proof of work:

import struct
import time
from typing import List, Dict

class BitcoinBlock:
    """Bitcoin block structure"""

    def __init__(self, previous_hash: str, transactions: List[Dict],
                 bits: int, timestamp: float = None):
        self.version = 1
        self.previous_hash = previous_hash
        self.merkle_root = self._calculate_merkle_root(transactions)
        self.timestamp = timestamp or time.time()
        self.bits = bits  # Difficulty target
        self.nonce = 0
        self.transactions = transactions
        self.block_hash = ""

    def _calculate_merkle_root(self, transactions: List[Dict]) -> str:
        """Calculate Merkle root"""
        if not transactions:
            return "0" * 64

        # Simplified implementation
        tx_hashes = [hashlib.sha256(json.dumps(tx, sort_keys=True).encode()).hexdigest()
                    for tx in transactions]

        while len(tx_hashes) > 1:
            new_hashes = []
            for i in range(0, len(tx_hashes), 2):
                if i + 1 < len(tx_hashes):
                    combined = tx_hashes[i] + tx_hashes[i + 1]
                else:
                    combined = tx_hashes[i] + tx_hashes[i]
                new_hashes.append(hashlib.sha256(combined.encode()).hexdigest())
            tx_hashes = new_hashes

        return tx_hashes[0]

    def get_header_bytes(self) -> bytes:
        """Get block header bytes"""
        # Bitcoin block header structure (80 bytes)
        header = struct.pack('<I', self.version)  # 4-byte version
        header += bytes.fromhex(self.previous_hash)  # 32-byte previous block hash
        header += bytes.fromhex(self.merkle_root)  # 32-byte Merkle root
        header += struct.pack('<I', int(self.timestamp))  # 4-byte timestamp
        header += struct.pack('<I', self.bits)  # 4-byte difficulty target
        header += struct.pack('<I', self.nonce)  # 4-byte nonce
        return header

    def calculate_hash(self) -> str:
        """Calculate block hash (double SHA-256)"""
        header_bytes = self.get_header_bytes()
        first_hash = hashlib.sha256(header_bytes).digest()
        second_hash = hashlib.sha256(first_hash).digest()
        return second_hash[::-1].hex()  # Little-endian to big-endian

    def get_target(self) -> int:
        """Calculate target value from bits"""
        # bits format: first 8 bits are exponent, last 24 bits are mantissa
        exponent = self.bits >> 24
        mantissa = self.bits & 0x00ffffff

        if exponent <= 3:
            target = mantissa >> (8 * (3 - exponent))
        else:
            target = mantissa << (8 * (exponent - 3))

        return target

class BitcoinMiner:
    """Bitcoin miner"""

    def __init__(self, miner_address: str):
        self.miner_address = miner_address
        self.hashrate = 0  # Hash rate (H/s)

    def mine_block(self, block: BitcoinBlock, max_nonce: int = 2**32) -> bool:
        """Mining process"""
        target = block.get_target()
        start_time = time.time()
        hashes_computed = 0

        print(f"Starting mining...")
        print(f"Target value: {hex(target)}")
        print(f"Difficulty bits: {hex(block.bits)}")

        for nonce in range(max_nonce):
            block.nonce = nonce
            block_hash = block.calculate_hash()
            hash_int = int(block_hash, 16)
            hashes_computed += 1

            # Check if difficulty requirement is met
            if hash_int < target:
                end_time = time.time()
                elapsed_time = end_time - start_time
                self.hashrate = hashes_computed / elapsed_time if elapsed_time > 0 else 0

                block.block_hash = block_hash

                print(f"Mining successful!")
                print(f"Nonce: {nonce}")
                print(f"Block hash: {block_hash}")
                print(f"Time: {elapsed_time:.2f}s")
                print(f"Hashes computed: {hashes_computed:,}")
                print(f"Hash rate: {self.hashrate:,.0f} H/s")

                return True

            # Show progress periodically
            if nonce % 100000 == 0 and nonce > 0:
                elapsed = time.time() - start_time
                current_hashrate = nonce / elapsed if elapsed > 0 else 0
                print(f"Progress: {nonce:,} nonces, current hash rate: {current_hashrate:,.0f} H/s")

        print("Mining failed: no valid nonce found")
        return False

    def create_coinbase_transaction(self, block_reward: float, block_height: int) -> Dict:
        """Create coinbase transaction"""
        coinbase_tx = {
            "version": 1,
            "inputs": [{
                "prev_txid": "0" * 64,  # Coinbase has no previous transaction
                "output_index": 0xffffffff,
                "signature_script": f"block_height_{block_height}",
                "sequence": 0xffffffff
            }],
            "outputs": [{
                "amount": block_reward,
                "script_pubkey": f"OP_DUP OP_HASH160 {self.miner_address} OP_EQUALVERIFY OP_CHECKSIG"
            }],
            "locktime": 0
        }

        # Calculate transaction ID
        tx_string = json.dumps(coinbase_tx, sort_keys=True)
        coinbase_tx["txid"] = hashlib.sha256(tx_string.encode()).hexdigest()

        return coinbase_tx

# Mining demonstration
print("\n=== Bitcoin Mining Demonstration ===")

# Create miner
miner = BitcoinMiner("miner_address_123")

# Create coinbase transaction
coinbase_tx = miner.create_coinbase_transaction(6.25, 700000)  # Current block reward 6.25 BTC
print(f"Coinbase transaction: {coinbase_tx['txid']}")

# Create block
transactions = [coinbase_tx]
new_block = BitcoinBlock(
    previous_hash="00000000000000000008a89e854d57e5667df88f1cdef6fde2fbca676bd5e2ea",
    transactions=transactions,
    bits=0x1d00ffff  # Lower difficulty for demonstration
)

print(f"Block Merkle root: {new_block.merkle_root}")

# Start mining (limit max attempts to avoid running too long)
success = miner.mine_block(new_block, max_nonce=500000)

if success:
    print(f"\nBlock mining successful!")
    print(f"Final block hash: {new_block.block_hash}")
else:
    print("Demonstration mining unsuccessful (difficulty may be too high)")

Difficulty Adjustment Mechanism

Bitcoin network automatically adjusts mining difficulty every 2016 blocks (approximately 2 weeks):

class DifficultyAdjustment:
    """Difficulty adjustment algorithm"""

    TARGET_BLOCK_TIME = 600  # Target block time 10 minutes
    ADJUSTMENT_INTERVAL = 2016  # Adjustment interval 2016 blocks
    MAX_ADJUSTMENT_FACTOR = 4  # Maximum adjustment factor

    @classmethod
    def calculate_new_difficulty(cls, current_bits: int, actual_time: float) -> int:
        """Calculate new difficulty target"""
        target_time = cls.TARGET_BLOCK_TIME * cls.ADJUSTMENT_INTERVAL

        # Calculate time ratio
        time_ratio = actual_time / target_time

        # Limit adjustment range
        if time_ratio < 1.0 / cls.MAX_ADJUSTMENT_FACTOR:
            time_ratio = 1.0 / cls.MAX_ADJUSTMENT_FACTOR
        elif time_ratio > cls.MAX_ADJUSTMENT_FACTOR:
            time_ratio = cls.MAX_ADJUSTMENT_FACTOR

        # Calculate current target value
        current_target = cls._bits_to_target(current_bits)

        # Calculate new target value
        new_target = int(current_target * time_ratio)

        # Convert back to bits format
        new_bits = cls._target_to_bits(new_target)

        return new_bits

    @classmethod
    def _bits_to_target(cls, bits: int) -> int:
        """Convert bits to target value"""
        exponent = bits >> 24
        mantissa = bits & 0x00ffffff

        if exponent <= 3:
            target = mantissa >> (8 * (3 - exponent))
        else:
            target = mantissa << (8 * (exponent - 3))

        return target

    @classmethod
    def _target_to_bits(cls, target: int) -> int:
        """Convert target value to bits"""
        if target == 0:
            return 0

        # Find most significant bit
        target_bytes = target.to_bytes((target.bit_length() + 7) // 8, 'big')

        # Calculate exponent and mantissa
        exponent = len(target_bytes)

        if target_bytes[0] > 0x7f:
            exponent += 1
            mantissa = int.from_bytes(target_bytes[:3], 'big') >> 8
        else:
            mantissa = int.from_bytes((target_bytes[:3] + b'\x00')[:3], 'big')

        bits = (exponent << 24) | mantissa
        return bits

    @classmethod
    def estimate_network_hashrate(cls, difficulty: float, block_time: float) -> float:
        """Estimate network hash rate"""
        # Network hash rate = difficulty * 2^32 / block time
        return difficulty * (2**32) / block_time

# Difficulty adjustment demonstration
print("\n=== Difficulty Adjustment Demonstration ===")

# Simulate historical data
current_bits = 0x1d00ffff
actual_time = 2016 * 8 * 60  # Actual time 8 minutes per block (faster than target)

print(f"Current difficulty bits: {hex(current_bits)}")
print(f"Target time: {DifficultyAdjustment.TARGET_BLOCK_TIME * DifficultyAdjustment.ADJUSTMENT_INTERVAL}s")
print(f"Actual time: {actual_time}s")

# Calculate new difficulty
new_bits = DifficultyAdjustment.calculate_new_difficulty(current_bits, actual_time)

print(f"New difficulty bits: {hex(new_bits)}")

# Convert to human-readable difficulty values
current_target = DifficultyAdjustment._bits_to_target(current_bits)
new_target = DifficultyAdjustment._bits_to_target(new_bits)

max_target = 0x00000000FFFF0000000000000000000000000000000000000000000000000000
current_difficulty = max_target / current_target
new_difficulty = max_target / new_target

print(f"Current difficulty: {current_difficulty:,.2f}")
print(f"New difficulty: {new_difficulty:,.2f}")
print(f"Difficulty adjustment: {new_difficulty/current_difficulty:.2f}x")

# Estimate network hash rate
hashrate = DifficultyAdjustment.estimate_network_hashrate(new_difficulty, 600)
print(f"Estimated network hash rate: {hashrate/1e12:.2f} TH/s")

Bitcoin Transaction Verification

Transaction Verification Process

class BitcoinValidator:
    """Bitcoin transaction validator"""

    def __init__(self, utxo_set: UTXOSet, mempool: List[Dict]):
        self.utxo_set = utxo_set
        self.mempool = mempool  # Memory pool

    def validate_transaction(self, transaction: BitcoinTransaction) -> Tuple[bool, str]:
        """Complete transaction validation"""

        # 1. Basic format validation
        if not self._validate_format(transaction):
            return False, "Invalid transaction format"

        # 2. Input validation
        if not self._validate_inputs(transaction):
            return False, "Invalid transaction inputs"

        # 3. Output validation
        if not self._validate_outputs(transaction):
            return False, "Invalid transaction outputs"

        # 4. Balance validation
        if not self._validate_balance(transaction):
            return False, "Input-output amount mismatch"

        # 5. Script validation
        if not self._validate_scripts(transaction):
            return False, "Script verification failed"

        # 6. Double-spend detection
        if not self._check_double_spend(transaction):
            return False, "Double-spend detected"

        return True, "Transaction verified"

    def _validate_format(self, transaction: BitcoinTransaction) -> bool:
        """Validate transaction basic format"""
        # Check version number
        if transaction.version < 1:
            return False

        # Check input/output count
        if len(transaction.inputs) == 0 or len(transaction.outputs) == 0:
            return False

        # Check transaction size limit
        tx_size = self._calculate_transaction_size(transaction)
        if tx_size > 1000000:  # 1MB limit
            return False

        return True

    def _validate_inputs(self, transaction: BitcoinTransaction) -> bool:
        """Validate transaction inputs"""
        for tx_input in transaction.inputs:
            # Check if referenced UTXO exists
            utxo_key = f"{tx_input['prev_txid']}:{tx_input['output_index']}"
            if utxo_key not in self.utxo_set.utxos:
                return False

            # Check signature script length
            script_sig = tx_input.get('signature_script', '')
            if len(script_sig) > 1650:  # Signature script size limit
                return False

        return True

    def _validate_outputs(self, transaction: BitcoinTransaction) -> bool:
        """Validate transaction outputs"""
        for output in transaction.outputs:
            # Check amount validity
            if output['amount'] <= 0:
                return False

            # Check minimum output amount (prevent dust attacks)
            if output['amount'] < 0.00000546:  # 546 satoshis
                return False

            # Check script length
            script_pubkey = output.get('script_pubkey', '')
            if len(script_pubkey) > 10000:
                return False

        return True

    def _validate_balance(self, transaction: BitcoinTransaction) -> bool:
        """Validate input-output balance"""
        input_total = 0
        output_total = 0

        # Calculate total inputs
        for tx_input in transaction.inputs:
            utxo_key = f"{tx_input['prev_txid']}:{tx_input['output_index']}"
            if utxo_key in self.utxo_set.utxos:
                input_total += self.utxo_set.utxos[utxo_key].amount

        # Calculate total outputs
        for output in transaction.outputs:
            output_total += output['amount']

        # Input must be >= output (difference is fee)
        return input_total >= output_total

    def _validate_scripts(self, transaction: BitcoinTransaction) -> bool:
        """Validate script execution"""
        for i, tx_input in enumerate(transaction.inputs):
            # Get corresponding UTXO
            utxo_key = f"{tx_input['prev_txid']}:{tx_input['output_index']}"
            if utxo_key not in self.utxo_set.utxos:
                return False

            utxo = self.utxo_set.utxos[utxo_key]

            # Combine scripts: unlocking script + locking script
            script_sig = tx_input.get('signature_script', '').split()
            script_pubkey = utxo.script_pubkey.split()

            combined_script = script_sig + script_pubkey

            # Execute script verification
            script_interpreter = BitcoinScript()
            if not script_interpreter.execute(combined_script):
                return False

        return True

    def _check_double_spend(self, transaction: BitcoinTransaction) -> bool:
        """Check for double-spend attacks"""
        used_inputs = set()

        # Check for duplicate inputs within transaction
        for tx_input in transaction.inputs:
            input_key = f"{tx_input['prev_txid']}:{tx_input['output_index']}"
            if input_key in used_inputs:
                return False
            used_inputs.add(input_key)

        # Check for conflicting transactions in mempool
        for mempool_tx in self.mempool:
            for mempool_input in mempool_tx.get('inputs', []):
                mempool_key = f"{mempool_input['prev_txid']}:{mempool_input['output_index']}"
                if mempool_key in used_inputs:
                    return False

        return True

    def _calculate_transaction_size(self, transaction: BitcoinTransaction) -> int:
        """Calculate transaction size (bytes)"""
        # Simplified calculation: base size + inputs*150 + outputs*35
        base_size = 10
        input_size = len(transaction.inputs) * 150
        output_size = len(transaction.outputs) * 35
        return base_size + input_size + output_size

# Transaction validation demonstration
print("\n=== Transaction Validation Demonstration ===")

# Create validator
validator = BitcoinValidator(utxo_set, [])

# Create valid transaction
valid_tx = BitcoinTransaction()
valid_tx.add_input("genesis_tx", 0, "valid_signature")
valid_tx.add_output(10.0, "bob_address")
valid_tx.add_output(14.9, "alice_address")  # Change
valid_tx.calculate_txid()

# Validate transaction
is_valid, message = validator.validate_transaction(valid_tx)
print(f"Transaction validation result: {is_valid}")
print(f"Validation message: {message}")

# Create invalid transaction (double-spend)
invalid_tx = BitcoinTransaction()
invalid_tx.add_input("genesis_tx", 0, "another_signature")  # Same UTXO
invalid_tx.add_output(20.0, "charlie_address")
invalid_tx.calculate_txid()

# Add first transaction to mempool
validator.mempool.append(valid_tx.__dict__)

# Validate double-spend transaction
is_valid, message = validator.validate_transaction(invalid_tx)
print(f"\nDouble-spend transaction validation result: {is_valid}")
print(f"Validation message: {message}")

Network Protocol and Message Types

from enum import Enum
from dataclasses import dataclass
from typing import Any, Dict

class MessageType(Enum):
    """Bitcoin network message types"""
    VERSION = "version"
    VERACK = "verack"
    ADDR = "addr"
    INV = "inv"
    GETDATA = "getdata"
    BLOCK = "block"
    TX = "tx"
    PING = "ping"
    PONG = "pong"

@dataclass
class BitcoinMessage:
    """Bitcoin network message"""
    command: MessageType
    payload: Dict[str, Any]
    timestamp: float

class BitcoinProtocol:
    """Bitcoin network protocol"""

    PROTOCOL_VERSION = 70015
    NETWORK_MAGIC = 0xD9B4BEF9  # Main net magic bytes

    @classmethod
    def create_version_message(cls, node_info: Dict) -> BitcoinMessage:
        """Create version handshake message"""
        payload = {
            "version": cls.PROTOCOL_VERSION,
            "services": 1,  # NODE_NETWORK
            "timestamp": int(time.time()),
            "addr_recv": node_info.get("remote_addr", "127.0.0.1:8333"),
            "addr_from": node_info.get("local_addr", "127.0.0.1:8333"),
            "nonce": node_info.get("nonce", 12345),
            "user_agent": "/Bitcoin Core:0.21.0/",
            "start_height": node_info.get("block_height", 0)
        }

        return BitcoinMessage(
            command=MessageType.VERSION,
            payload=payload,
            timestamp=time.time()
        )

    @classmethod
    def create_inv_message(cls, inventory_type: str, items: List[str]) -> BitcoinMessage:
        """Create inventory announcement message"""
        inventory_items = []

        for item in items:
            inv_type = {
                "tx": 1,      # Transaction
                "block": 2,   # Block
                "filtered_block": 3  # Filtered block
            }.get(inventory_type, 0)

            inventory_items.append({
                "type": inv_type,
                "hash": item
            })

        payload = {
            "count": len(inventory_items),
            "inventory": inventory_items
        }

        return BitcoinMessage(
            command=MessageType.INV,
            payload=payload,
            timestamp=time.time()
        )

# Protocol message demonstration
print("\n=== Bitcoin Network Protocol Demonstration ===")

# Create version message
node_info = {
    "remote_addr": "192.168.1.100:8333",
    "local_addr": "192.168.1.101:8333",
    "nonce": 567890,
    "block_height": 700000
}

version_msg = BitcoinProtocol.create_version_message(node_info)
print("Version message:")
print(f"  Protocol version: {version_msg.payload['version']}")
print(f"  User agent: {version_msg.payload['user_agent']}")
print(f"  Block height: {version_msg.payload['start_height']}")

# Create inventory announcement message
new_transactions = [
    "a1b2c3d4e5f6789012345678901234567890123456789012345678901234567890",
    "b2c3d4e5f6789012345678901234567890123456789012345678901234567890a1"
]

inv_msg = BitcoinProtocol.create_inv_message("tx", new_transactions)
print(f"\nInventory announcement message:")
print(f"  Type: Transaction announcement")
print(f"  Count: {inv_msg.payload['count']}")
print(f"  First transaction hash: {inv_msg.payload['inventory'][0]['hash'][:16]}...")

Chapter Summary

This chapter provides an in-depth analysis of Bitcoin’s technical implementation:

  1. UTXO Model:

    • Differences from account model
    • Transaction input-output structure
    • Change mechanism
  2. Script System:

    • Stack-based scripting language
    • Script types like P2PKH, multi-signature
    • Timelocks and conditional payments
  3. Mining Mechanism:

    • Double SHA-256 proof of work
    • Block structure and nonce search
    • Difficulty adjustment algorithm
  4. Transaction Verification:

    • Multi-layer verification mechanism
    • Script execution verification
    • Double-spend detection
  5. Network Protocol:

    • P2P message types
    • Node discovery and communication
    • Data synchronization mechanism

As the first successful virtual currency, Bitcoin’s design philosophy and technical implementation laid the foundation for subsequent virtual currency development. In the next chapter, we will learn about the innovative features of other mainstream virtual currencies.