- Overview
- Tutorials
- Getting started
- Get started with Canton and the JSON Ledger API
- Get Started with Canton, the JSON Ledger API, and TypeScript
- Get started with Canton Network App Dev Quickstart
- Get started with smart contract development
- Basic contracts
- Test templates using Daml scripts
- Build the Daml Archive (.dar) file
- Data types
- Transform contracts using choices
- Add constraints to a contract
- Parties and authority
- Compose choices
- Handle exceptions
- Work with dependencies
- Functional programming 101
- The Daml standard library
- Test Daml contracts
- Next steps
- Application development
- Getting started
- Development how-tos
- Component how-tos
- Explanations
- References
- Application development
- Smart contract development
- Daml language cheat sheet
- Daml language reference
- Daml standard library
- DA.Action.State.Class
- DA.Action.State
- DA.Action
- DA.Assert
- DA.Bifunctor
- DA.Crypto.Text
- DA.Date
- DA.Either
- DA.Exception
- DA.Fail
- DA.Foldable
- DA.Functor
- DA.Internal.Interface.AnyView.Types
- DA.Internal.Interface.AnyView
- DA.List.BuiltinOrder
- DA.List.Total
- DA.List
- DA.Logic
- DA.Map
- DA.Math
- DA.Monoid
- DA.NonEmpty.Types
- DA.NonEmpty
- DA.Numeric
- DA.Optional
- DA.Record
- DA.Semigroup
- DA.Set
- DA.Stack
- DA.Text
- DA.TextMap
- DA.Time
- DA.Traversable
- DA.Tuple
- DA.Validation
- GHC.Show.Text
- GHC.Tuple.Check
- Prelude
- Smart contract upgrading reference
- Glossary of concepts
Onboard External Party¶
This tutorial demonstrates how to onboard an external party. External parties can authorize Daml transactions without the need to trust any node of the network by signing transactions using a key they control. Before proceeding, it is recommended to review the external signing overview to understand the concept of external signing. Additionally, the topology tutorial provides a detailed explanation of the topology concepts used in this tutorial.
The tutorial illustrates the onboarding of a party named Alice
. The process can be repeated any number of times to onboard new parties.
Important
This tutorial is for demo purposes. The code snippets should not be used directly in a production environment.
Prerequisites¶
For simplicity, this tutorial assumes a minimal Canton setup consisting of one participant node connected to one synchronizer (which includes both a sequencer node and a mediator node).
Tip
If you already have such an instance running, proceed to the Setup section.
This configuration is not necessary to onboard external parties per se, but will be when submitting externally signed transactions.
Start Canton¶
To obtain a Canton artifact refer to the getting started section. From the artifact directory, start Canton using the command:
./bin/canton -c examples/08-interactive-submission/interactive-submission.conf --bootstrap examples/08-interactive-submission/bootstrap.canton
Once the “Welcome to Canton” message appears, you are ready to proceed.
Setup¶
Navigate to the interactive submission example folder located at examples/08-interactive-submission
in the Canton release artifact.
To proceed, gather the following information by running the commands below in the Canton console:
Participant Id
Admin API endpoint
@ participant1.id.filterString
res1: String = "participant1::12201ff69b1d24edbf0ee2028a304ea702ee8536790dab1a31e7136e6d90ff6d473c"
@ participant1.config.adminApi.address
res2: String = "127.0.0.1"
@ participant1.config.adminApi.port.unwrap
res3: Int = 30043
In the rest of the tutorial we’ll use the following values, but make sure to replace them with your own:
Participant Id:
participant1::122083aecbe5b3ca3c95c7584d2e0202891f8051d39754802a156521cd1677c8e759
Admin API endpoint:
localhost:4002
API¶
This tutorial interacts with the TopologyManagerWriteService
, a gRPC service available on the Admin API of the participant node.
See the External Signing Topology Transaction Tutorial for its definition.
It uses Python to demonstrate the onboarding of an external party.
It is recommended to use a dedicated python environment to avoid conflicting dependencies. Considering using venv.
python -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
Then run the setup script to generate the necessary python files to interact with Canton’s gRPC interface:
./setup.sh
Important
The tutorial builds up on the externally signed topology transactions tutorial by re-using some if its code and concepts. For convenience, here are the topology utility functions used in the tutorial:
# Copyright (c) 2025 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# [Imports start]
from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePrivateKey
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives import hashes
from grpc import Channel
from com.digitalasset.canton.topology.admin.v30 import (
topology_manager_write_service_pb2_grpc,
topology_manager_read_service_pb2_grpc,
)
from com.digitalasset.canton.topology.admin.v30 import (
topology_manager_write_service_pb2,
topology_manager_read_service_pb2,
common_pb2,
)
from com.digitalasset.canton.protocol.v30 import topology_pb2
from com.digitalasset.canton.version.v1 import untyped_versioned_message_pb2
from com.digitalasset.canton.crypto.v30 import crypto_pb2
from google.rpc import status_pb2, error_details_pb2
from google.protobuf import empty_pb2
from google.protobuf.json_format import MessageToJson
import hashlib
import grpc
# [Imports end]
def handle_grpc_error(func):
"""
Decorator to handle gRPC errors and print detailed error information.
Args:
func (function): The gRPC function to be wrapped.
Returns:
function: Wrapped function with error handling.
"""
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except grpc.RpcError as e:
print("gRPC error occurred:")
grpc_metadata: grpc.aio.Metadata = grpc.aio.Metadata.from_tuple(
e.trailing_metadata()
)
metadata = grpc_metadata.get("grpc-status-details-bin")
if metadata is None:
raise
status: status_pb2.Status = status_pb2.Status.FromString(metadata)
for detail in status.details:
if detail.type_url == "type.googleapis.com/google.rpc.ErrorInfo":
error: error_details_pb2.ErrorInfo = (
error_details_pb2.ErrorInfo.FromString(detail.value)
)
print(MessageToJson(error))
else:
print(MessageToJson(detail))
raise
return wrapper
# Computes a canton compatible hash using sha256
# purpose: Canton prefixes content with a hash purpose
# https://github.com/digital-asset/canton/blob/main/community/base/src/main/scala/com/digitalasset/canton/crypto/HashPurpose.scala
# content: payload to be hashed
def compute_sha256_canton_hash(purpose: int, content: bytes):
hash_purpose = purpose.to_bytes(4, byteorder="big")
# Hashed content
hashed_content = hashlib.sha256(hash_purpose + content).digest()
# Multi-hash encoding
# Canton uses an implementation of multihash (https://github.com/multiformats/multihash)
# Since we use sha256 always here, we can just hardcode the prefixes
# This may be improved and simplified in subsequent versions
sha256_algorithm_prefix = bytes([0x12])
sha256_length_prefix = bytes([0x20])
return sha256_algorithm_prefix + sha256_length_prefix + hashed_content
# Computes the fingerprint of a public key by hashing it and adding some Canton specific data
def compute_fingerprint(public_key_bytes: bytes) -> str:
"""
Computes the fingerprint of a public signing key.
Args:
public_key_bytes (bytes): The serialized transaction data.
Returns:
str: The computed fingerprint in hexadecimal format.
"""
# 12 is the hash purpose for public key fingerprints
# https://github.com/digital-asset/canton/blob/main/community/base/src/main/scala/com/digitalasset/canton/crypto/HashPurpose.scala
return compute_sha256_canton_hash(12, public_key_bytes).hex()
def compute_topology_transaction_hash(serialized_versioned_transaction: bytes) -> bytes:
"""
Computes the hash of a serialized topology transaction.
Args:
serialized_versioned_transaction (bytes): The serialized transaction data.
Returns:
bytes: The computed hash.
"""
# 11 is the hash purpose for topology transaction signatures
# https://github.com/digital-asset/canton/blob/main/community/base/src/main/scala/com/digitalasset/canton/crypto/HashPurpose.scala
return compute_sha256_canton_hash(11, serialized_versioned_transaction)
def compute_multi_transaction_hash(hashes: [bytes]) -> bytes:
"""
Computes a combined hash for multiple topology transactions.
This function sorts the given hashes, concatenates them with length encoding,
and computes a Canton-specific SHA-256 hash with a predefined purpose.
Args:
hashes (list[bytes]): A list of hashes representing individual topology transactions.
Returns:
bytes: The computed multi-transaction hash.
"""
# Sort the hashes by their hex representation
sorted_hashes = sorted(hashes, key=lambda h: h.hex())
# Start with the number of hashes encoded as a 4 bytes integer in big endian
combined_hashes = len(sorted_hashes).to_bytes(4, byteorder="big")
# Concatenate each hash, prefixing them with their size as a 4 bytes integer in big endian
for h in sorted_hashes:
combined_hashes += len(h).to_bytes(4, byteorder="big") + h
# 55 is the hash purpose for multi topology transaction hashes
return compute_sha256_canton_hash(55, combined_hashes)
def sign_hash(
private_key: EllipticCurvePrivateKey,
data: bytes,
):
"""
Signs the given data using an elliptic curve private key.
Args:
private_key (EllipticCurvePrivateKey): The private key used for signing.
data (bytes): The data to be signed.
Returns:
bytes: The generated signature.
"""
return private_key.sign(
data=data,
signature_algorithm=ec.ECDSA(hashes.SHA256()),
)
def build_add_transaction_request(
signed_transactions: [topology_pb2.SignedTopologyTransaction],
synchronizer_id: str,
):
"""
Builds an AddTransactionsRequest for the topology API.
Args:
signed_transactions (list[topology_pb2.SignedTopologyTransaction]): List of signed transactions.
synchronizer_id (str): The synchronizer ID for the transaction.
Returns:
topology_manager_write_service_pb2.AddTransactionsRequest: The request object.
"""
return topology_manager_write_service_pb2.AddTransactionsRequest(
transactions=signed_transactions,
store=common_pb2.StoreId(
synchronizer=common_pb2.StoreId.Synchronizer(
id=synchronizer_id,
)
),
)
def build_canton_signature(
signature: bytes,
signed_by: str,
format: crypto_pb2.SignatureFormat,
spec: crypto_pb2.SigningAlgorithmSpec,
):
"""
Builds a Canton-compatible digital signature.
Args:
signature (bytes): The cryptographic signature bytes.
signed_by (str): The identifier of the entity that signed the data.
format (crypto_pb2.SignatureFormat): The format of the signature.
spec (crypto_pb2.SigningAlgorithmSpec): The signing algorithm specification.
Returns:
crypto_pb2.Signature: A protocol buffer representation of the Canton signature.
"""
return crypto_pb2.Signature(
format=format,
signature=signature,
signed_by=signed_by,
signing_algorithm_spec=spec,
)
def build_signed_transaction(
serialized_versioned_transaction: bytes,
signatures: [crypto_pb2.Signature],
):
"""
Builds a signed topology transaction.
Args:
serialized_versioned_transaction (bytes): Serialized topology transaction.
signatures (list[crypto_pb2.Signature]): List of cryptographic signatures.
Returns:
topology_pb2.SignedTopologyTransaction: The signed transaction.
"""
return topology_pb2.SignedTopologyTransaction(
transaction=serialized_versioned_transaction,
signatures=signatures,
)
def build_namespace_mapping(
public_key_fingerprint: str,
public_key_bytes: bytes,
key_format: crypto_pb2.CryptoKeyFormat,
key_scheme: crypto_pb2.SigningKeyScheme,
):
"""
Constructs a topology mapping for namespace delegation.
Args:
public_key_fingerprint (str): The fingerprint of the public key.
public_key_bytes (bytes): The raw bytes of the public key.
key_format (crypto_pb2.CryptoKeyFormat): The format of the public key.
key_scheme (crypto_pb2.SigningKeyScheme): The signing scheme of the key.
Returns:
topology_pb2.TopologyMapping: A topology mapping for namespace delegation.
"""
return topology_pb2.TopologyMapping(
namespace_delegation=topology_pb2.NamespaceDelegation(
namespace=public_key_fingerprint,
target_key=crypto_pb2.SigningPublicKey(
# Must match the format to which the key was exported
format=key_format,
public_key=public_key_bytes,
# Must match the scheme of the key
scheme=key_scheme,
# Keys in NamespaceDelegation are used only for namespace operations
usage=[
crypto_pb2.SigningKeyUsage.SIGNING_KEY_USAGE_NAMESPACE,
],
),
is_root_delegation=True,
)
)
def build_topology_transaction(
mapping: topology_pb2.TopologyMapping,
serial: int = 1,
):
"""
Builds a topology transaction.
Args:
mapping (topology_pb2.TopologyMapping): The topology mapping to include in the transaction.
serial (int): The serial of the topology transaction. Defaults to 1.
Returns:
topology_pb2.TopologyTransaction: The topology transaction object.
"""
return topology_pb2.TopologyTransaction(
mapping=mapping,
operation=topology_pb2.Enums.TopologyChangeOp.TOPOLOGY_CHANGE_OP_ADD_REPLACE,
serial=serial,
)
def build_versioned_transaction(
data: bytes,
):
"""
Builds a versioned transaction wrapper for the given data.
Args:
data (bytes): Serialized transaction data.
Returns:
untyped_versioned_message_pb2.UntypedVersionedMessage: The versioned transaction object.
"""
return untyped_versioned_message_pb2.UntypedVersionedMessage(
data=data,
version=30,
)
def serialize_topology_transaction(
mapping: topology_pb2.TopologyMapping,
serial: int = 1,
):
"""
Serializes a topology transaction.
Args:
mapping (topology_pb2.TopologyMapping): The topology mapping to serialize.
serial (int): The serial of the topology transaction. Defaults to 1.
Returns:
bytes: The serialized topology transaction.
"""
topology_transaction = build_topology_transaction(mapping, serial)
versioned_topology_transaction = build_versioned_transaction(
topology_transaction.SerializeToString()
)
return versioned_topology_transaction.SerializeToString()
@handle_grpc_error
def submit_signed_transactions(
channel: Channel,
signed_transactions: [topology_pb2.SignedTopologyTransaction],
synchronizer_id: str,
) -> (EllipticCurvePrivateKey, str):
"""
Submits signed topology transactions to the Canton topology API.
Args:
channel (Channel): The gRPC channel used to communicate with the topology service.
signed_transactions (list[topology_pb2.SignedTopologyTransaction]):
A list of signed topology transactions to be submitted.
synchronizer_id (str): The identifier of the synchronizer to target.
Raises:
grpc.RpcError: If there is an issue communicating with the topology API.
"""
add_transactions_request = build_add_transaction_request(
signed_transactions,
synchronizer_id,
)
topology_write_client = (
topology_manager_write_service_pb2_grpc.TopologyManagerWriteServiceStub(channel)
)
topology_write_client.AddTransactions(add_transactions_request)
@handle_grpc_error
def list_namespace_delegation(
channel: Channel,
synchronizer_id: str,
fingerprint: str,
):
"""
Retrieves namespace delegations from the topology API.
Args:
channel (Channel): The gRPC channel used to communicate with the topology service.
synchronizer_id (str): The identifier of the synchronizer managing the namespace.
fingerprint (str): The fingerprint of the public key associated with the namespace.
Returns:
topology_manager_read_service_pb2.ListNamespaceDelegationResponse:
The response containing the list of namespace delegations.
Raises:
grpc.RpcError: If there is an issue communicating with the topology API.
"""
list_namespace_delegation_request = (
topology_manager_read_service_pb2.ListNamespaceDelegationRequest(
base_query=topology_manager_read_service_pb2.BaseQuery(
store=common_pb2.StoreId(
synchronizer=common_pb2.StoreId.Synchronizer(id=synchronizer_id)
),
head_state=empty_pb2.Empty(),
),
filter_namespace=fingerprint,
)
)
topology_read_client = (
topology_manager_read_service_pb2_grpc.TopologyManagerReadServiceStub(channel)
)
return topology_read_client.ListNamespaceDelegation(
list_namespace_delegation_request
)
Additionally, the following imports and variables are required for the rest of the tutorial:
Imports¶
import time
import grpc
from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePrivateKey
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives import serialization
from grpc import Channel
import google.protobuf.empty_pb2
from com.digitalasset.canton.topology.admin.v30 import (
topology_manager_write_service_pb2_grpc,
)
from com.digitalasset.canton.topology.admin.v30 import (
topology_manager_write_service_pb2,
)
from com.digitalasset.canton.topology.admin.v30 import (
topology_manager_read_service_pb2_grpc,
)
from com.digitalasset.canton.topology.admin.v30 import (
topology_manager_read_service_pb2,
common_pb2,
)
from com.digitalasset.canton.protocol.v30 import topology_pb2
from com.digitalasset.canton.crypto.v30 import crypto_pb2
from google.protobuf import empty_pb2
from interactive_topology_util import (
compute_fingerprint,
compute_sha256_canton_hash,
serialize_topology_transaction,
compute_multi_transaction_hash,
sign_hash,
compute_topology_transaction_hash,
)
Tutorial variables¶
admin_port="4002"
gRPC channel¶
admin_channel = grpc.insecure_channel(f"localhost:{admin_port}")
1. Topology Mappings¶
Onboarding an external party requires three topology mappings:
NamespaceDelegation
: Defines a root namespace for the party and registers the namespace signing key, which is used to authorize topology changes involving the party’s identity.PartyToKeyMapping
:The protocol signing key responsible for authenticating the submission of Daml transactions to the ledger on behalf of the party.
A threshold (number) of keys, at least equal to the number of keys registered. At least threshold-many signatures must be obtained for a transaction submission to be authorized.
PartyToParticipantMapping
:Associates the party with one or more participant nodes, granting them confirmation rights. These rights allow participant nodes to validate Daml transactions involving the party and authorize their commitment to the ledger on behalf of the party.
A threshold (number) of participant node, at least equal to the number of hosting participants. At least threshold-many confirmations must be obtained from the hosting participants for a valid transaction to be authorized and committed to the ledger.
Note
Hosting a party on more than one participant nodes for confirmation allows the party to reduce the trust put in any single node, as well as increase their overall availability on the network (e.g if a confirmation node becomes unavailable). See the Trust model for more details.
2. Signing Keys¶
Canton uses digital signatures for authentication.
As shown in the previous section, two of the three required topology mappings, NamespaceDelegation
and PartyToKeyMapping
, are used to register the corresponding public keys for these private keys.
Best practices suggest using separate signing keys for different purposes, and it is strongly recommended to use distinct key pairs for these two mappings. However, for simplicity, this tutorial will use a single key pair.
Generate a signing key pair¶
# For the sake of simplicity in the demo, we use a single signing key pair for the party namespace (used to manage the party itself on the network),
# and for the signing of transactions via the interactive submission service. We however recommend to use different keys in real world deployment for better security.
private_key = ec.generate_private_key(curve=ec.SECP256R1())
public_key = private_key.public_key()
# Extract the public key in the DER format
public_key_bytes: bytes = public_key.public_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
)
# Wrap the public key in a Canton protobuf message
signing_public_key = crypto_pb2.SigningPublicKey(
# Must match the format to which the key was exported to above
format=crypto_pb2.CryptoKeyFormat.CRYPTO_KEY_FORMAT_DER,
public_key=public_key_bytes,
# Must match the scheme of the key
scheme=crypto_pb2.SigningKeyScheme.SIGNING_KEY_SCHEME_EC_DSA_P256,
# Because we have only one key, we specify both NAMESPACE and PROTOCOL usage for it
# When using different keys, ensure to use only the correct usage for each
usage=[
crypto_pb2.SigningKeyUsage.SIGNING_KEY_USAGE_NAMESPACE,
crypto_pb2.SigningKeyUsage.SIGNING_KEY_USAGE_PROTOCOL,
],
# This field is deprecated in favor of scheme but python requires us to set it
key_spec=crypto_pb2.SIGNING_KEY_SPEC_EC_P256,
)
3. Fingerprint¶
Canton uses fingerprints to efficiently identify and reference signing keys. Refer to the Fingerprint section of the topology tutorial for more information.
Compute the fingerprint¶
public_key_fingerprint = compute_fingerprint(public_key_bytes)
4. Party ID¶
A Party ID
is composed of two parts:
A human readable name, in this case:
alice
The fingerprint of the namespace signing key, also simply called
namespace
Construct the party ID¶
# The party id is constructed with party_name :: fingerprint
# This must be the fingerprint of the _namespace signing key_
party_id = party_name + "::" + public_key_fingerprint
5. External Party Onboarding Transactions¶
Generate the three topology transactions necessary for the onboarding of Alice
.
Build and hash transaction function¶
def build_serialized_transaction_and_hash(
mapping: topology_pb2.TopologyMapping,
) -> (bytes, bytes):
"""
Generates a serialized topology transaction and its corresponding hash.
Args:
mapping (topology_pb2.TopologyMapping): The topology mapping to be serialized.
Returns:
tuple: A tuple containing:
- bytes: The serialized transaction.
- bytes: The SHA-256 hash of the serialized transaction.
"""
transaction = serialize_topology_transaction(mapping)
transaction_hash = compute_sha256_canton_hash(11, transaction)
return transaction, transaction_hash
Build a party to key transaction¶
def build_party_to_key_transaction(
channel: grpc.Channel,
party_id: str,
new_signing_key: crypto_pb2.SigningPublicKey,
synchronizer_id: str,
) -> bytes:
"""
Constructs a topology transaction that updates the party-to-key mapping.
Args:
channel (grpc.Channel): gRPC channel for communication with the topology manager.
party_id (str): Identifier of the party whose key mapping is being updated.
new_signing_key (crypto_pb2.SigningPublicKey): The new signing key to be added.
synchronizer_id (str): ID of the synchronizer to query the topology state.
Returns:
bytes: Serialized topology transaction containing the updated mapping.
"""
# Retrieve the current party to key mapping
list_party_to_key_request = (
topology_manager_read_service_pb2.ListPartyToKeyMappingRequest(
base_query=topology_manager_read_service_pb2.BaseQuery(
store=common_pb2.StoreId(
synchronizer=common_pb2.StoreId.Synchronizer(id=synchronizer_id)
),
head_state=empty_pb2.Empty(),
),
filter_party=party_id,
)
)
topology_read_client = (
topology_manager_read_service_pb2_grpc.TopologyManagerReadServiceStub(channel)
)
party_to_key_response: (
topology_manager_read_service_pb2.ListPartyToKeyMappingResponse
) = topology_read_client.ListPartyToKeyMapping(list_party_to_key_request)
if len(party_to_key_response.results) == 0:
current_serial = 1
current_keys_list = []
else:
# Sort the results by serial in descending order and take the first one
sorted_results = sorted(
party_to_key_response.results,
key=lambda result: result.context.serial,
reverse=True,
)
# Get the mapping with the highest serial and its list of hosting participants
current_serial = sorted_results[0].context.serial
current_keys_list: [crypto_pb2.SigningPublicKey] = sorted_results[
0
].item.signing_keys
# Create a new mapping adding the new participant to the list and incrementing the serial
updated_mapping = topology_pb2.TopologyMapping(
party_to_key_mapping=topology_pb2.PartyToKeyMapping(
party=party_id,
threshold=1,
signing_keys=current_keys_list + [new_signing_key],
)
)
# Build the serialized transaction
return serialize_topology_transaction(updated_mapping, serial=current_serial + 1)
Note
The build_party_to_key_transaction
function is an example of how to safely build a topology transaction by
first obtaining the highest serial for it unique mapping, updating the mapping’s content and incrementing the serial by 1.
This ensures concurrent updates would be rejected. During onboarding of external parties however it is expected that
there are no existing mappings and the serial will therefore bet set to 1.
Build and hash onboarding transactions¶
# Namespace delegation: registers a root namespace with the public key of the party to the network
# effectively creating the party.
namespace_delegation_mapping = topology_pb2.TopologyMapping(
namespace_delegation=topology_pb2.NamespaceDelegation(
namespace=public_key_fingerprint,
target_key=signing_public_key,
is_root_delegation=True,
)
)
(namespace_delegation_transaction, namespace_transaction_hash) = (
build_serialized_transaction_and_hash(namespace_delegation_mapping)
)
# Party to key: registers the public key as the one that will be used to sign and authorize Daml transactions submitted
# to the ledger via the interactive submission service
party_to_key_transaction = build_party_to_key_transaction(
channel, party_id, signing_public_key, synchronizer_id
)
party_to_key_transaction_hash = compute_topology_transaction_hash(
party_to_key_transaction
)
# Party to participant: records the fact that the party wants to be hosted on the participants with confirmation rights
# This means those participants are not allowed to submit transactions on behalf of this party but will validate transactions
# on behalf of the party by confirming or rejecting them according to the ledger model. They also records transaction for that party on the ledger.
confirming_participants_hosting = []
for confirming_participant_id in confirming_participant_ids:
confirming_participants_hosting.append(
topology_pb2.PartyToParticipant.HostingParticipant(
participant_uid=confirming_participant_id,
permission=topology_pb2.Enums.ParticipantPermission.PARTICIPANT_PERMISSION_CONFIRMATION,
)
)
party_to_participant_mapping = topology_pb2.TopologyMapping(
party_to_participant=topology_pb2.PartyToParticipant(
party=party_id,
threshold=confirming_threshold,
participants=confirming_participants_hosting,
)
)
(party_to_participant_transaction, party_to_participant_transaction_hash) = (
build_serialized_transaction_and_hash(party_to_participant_mapping)
)
This tutorial uses a single signing key, therefore all transactions are signed exclusively with that key (with the exception of the PartyToParticipant
transaction that also needs to be signed by the hosting participant).
However, in a production environment where multiple keys are used, each transaction must be signed with the appropriate keys:
Namespace Signing Key
: All transactions must be signed by this key, as it authorizes any topology state changes involving the party.PartyToKeyMapping
Transaction: In addition to the namespace signing key, this transaction must be signed by all protocol signing keys it registers. This ensures the network can verify that the party has control over those keys.PartyToParticipantMapping
Transaction: Along with the namespace signing key, this transaction must be signed by all hosting participants it registers. Participants provide an RPC to sign transactions using their own signing key, as demonstrated in the next section.
Note
Any change to these topology transactions requires a signature from the namespace key. No node can alter the topology state of the external party without an explicit signature from its namespace key.
6. Multi Transaction Hash¶
In order to reduce the number of signing operations required, compute a multi-transaction hash of all three transactions combined. Signing this hash allows authenticating all three transactions at once. A function to that effect is already available in the utility functions provided at the beginning of the tutorial.
Compute multi hash¶
# Combine the hashes of all three transactions, so we can perform a single signature
multi_hash = compute_multi_transaction_hash(
[
namespace_transaction_hash,
party_to_key_transaction_hash,
party_to_participant_transaction_hash,
]
)
7. Signing¶
First, sign the multi hash with the namespace key:
Sign multi hash¶
signature = sign_hash(private_key, multi_hash)
Then, build the SignedTopologyTransaction
messages expected by the Topology API:
Build signed topology transaction function¶
def build_signed_topology_transaction(
transaction: bytes,
hashes: [bytes],
signature: bytes,
signed_by: str,
proposal: bool = False,
):
"""
Builds a signed topology transaction, optionally including multi-transaction signatures.
Args:
transaction (bytes): The raw bytes representing the transaction to be signed.
hashes (list[bytes]): A list of transaction hashes for the multi-transaction signature.
signature (bytes): The signature for the transaction.
signed_by (str): The identifier of the entity signing the transaction.
proposal (bool, optional): A flag indicating if this transaction is part of a proposal. Defaults to False.
Returns:
topology_pb2.SignedTopologyTransaction
"""
return topology_pb2.SignedTopologyTransaction(
transaction=transaction,
# Not set because we use the multi transactions signature
signatures=[],
multi_transaction_signatures=[
topology_pb2.MultiTransactionSignatures(
transaction_hashes=hashes,
signatures=[
crypto_pb2.Signature(
format=crypto_pb2.SignatureFormat.SIGNATURE_FORMAT_RAW,
signature=signature,
signed_by=signed_by,
signing_algorithm_spec=crypto_pb2.SigningAlgorithmSpec.SIGNING_ALGORITHM_SPEC_EC_DSA_SHA_256,
)
],
)
],
proposal=proposal,
)
hash_list = [
namespace_transaction_hash,
party_to_key_transaction_hash,
party_to_participant_transaction_hash,
]
signed_namespace_transaction = build_signed_topology_transaction(
namespace_delegation_transaction, hash_list, signature, public_key_fingerprint
)
signed_party_to_key_transaction = build_signed_topology_transaction(
party_to_key_transaction, hash_list, signature, public_key_fingerprint
)
signed_party_to_participant_transaction = build_signed_topology_transaction(
party_to_participant_transaction,
hash_list,
signature,
public_key_fingerprint,
True,
)
8. Submit¶
Submit the transactions signed with the external party’s key:
Load the signed transactions onto the participant¶
add_transactions_request = (
topology_manager_write_service_pb2.AddTransactionsRequest(
transactions=[
signed_namespace_transaction,
signed_party_to_key_transaction,
signed_party_to_participant_transaction,
],
store=common_pb2.StoreId(
synchronizer=common_pb2.StoreId.Synchronizer(
id=synchronizer_id,
)
),
)
)
topology_write_client.AddTransactions(add_transactions_request)
10. Observe Onboarded Party¶
Finally, wait to observe the party in the topology, confirming it was created successfully:
Observe PartyToParticipant transaction function¶
def wait_to_observe_party_to_participant(
topology_read_client: topology_manager_read_service_pb2_grpc,
synchronizer_id: str,
party_id,
):
party_in_topology = False
while not party_in_topology:
party_to_participant_response: (
topology_manager_read_service_pb2.ListPartyToParticipantResponse
) = topology_read_client.ListPartyToParticipant(
topology_manager_read_service_pb2.ListPartyToParticipantRequest(
base_query=topology_manager_read_service_pb2.BaseQuery(
store=common_pb2.StoreId(
synchronizer=common_pb2.StoreId.Synchronizer(
id=synchronizer_id,
)
),
head_state=google.protobuf.empty_pb2.Empty(),
),
filter_party=party_id,
)
)
if len(party_to_participant_response.results) > 0:
break
else:
time.sleep(0.5)
continue
Wait for party to appear in topology¶
# If there's only one confirming participant, onboarding should be complete already
if len(confirming_participant_ids) == 1:
wait_to_observe_party_to_participant(
topology_read_client, synchronizer_id, party_id
)
Alice
is now successfully onboarded and ready to interact with the ledger. Move to the next tutorial to learn how to submit externally signed transactions.
Tooling¶
The scripts mentioned in this tutorial can be used as tools for testing and development purposes
Onboard external party¶
Create an external party on the ledger and write their private and public keys to local der files. By default the synchronizer ID and participant ID will be picked up from the files written by the canton bootstrap script in this directory. They can be overridden with ` –synchronizer-id synchronizer_id` and –participant-id participant_id.
./setup.sh
python interactive_submission.py create-party --name alice
Output:
Onboarding alice
Waiting for alice to appear in topology
Party ID: alice::122076f2a757c1ea944f52fc1fa854aa78077672efa32d7903e97cbf92646331876d
Written private key to: alice::122076f2a757c1ea944f52fc1fa854aa78077672efa32d7903e97cbf92646331876d-private-key.der
Written public key to: alice::122076f2a757c1ea944f52fc1fa854aa78077672efa32d7903e97cbf92646331876d-public-key.der
Advanced Onboarding Topics¶
Multi Hosted Party¶
This tutorial uses a simplified setup with a single participant. However, as explained in the external signing overview and throughout the tutorial, an external party can be hosted on multiple confirming participants. As mentioned before this has security and availability advantages.
Party Multi Hosting differs from the tutorial above in only two ways:
The
PartyToParticipant
topology mapping is updated to:List all hosting participants (along with their permission) instead of just one
The threshold may be adjusted to strike the desired tradeoff between security and availability
All hosting participants must approve the
PartyToParticipant
transaction.To that end, the party ID generated during the onboarding process can be sent out of band to all other hosting participants.
They can then use the party ID to lookup the pending
PartyToParticipant
transaction and approve it.
The following python function illustrates this process:
Function to lookup a pending PartyToParticipant transaction and authorize it¶
def authorize_external_party_hosting(
party_id: str,
synchronizer_id: str,
channel: Channel,
auto_accept: bool,
):
"""
Authorizes the hosting of a multi-hosted external party on the current node.
Expects the PartyToParticipant proposal to have already been published to the synchronizer.
Args:
party_id (str): ID of the party.
synchronizer_id (str): ID of the synchronizer on which the party will be registered.
channel (grpc.Channel): gRPC channel to the confirming participant Admin API.
auto_accept (bool): Will not ask for confirmation when true.
"""
print(f"Authorizing hosting of {party_id}")
topology_write_client = (
topology_manager_write_service_pb2_grpc.TopologyManagerWriteServiceStub(channel)
)
topology_read_client = (
topology_manager_read_service_pb2_grpc.TopologyManagerReadServiceStub(channel)
)
# Retrieve the pending proposal
transaction_in_store = False
party_to_participant_proposals: (
topology_manager_read_service_pb2.ListPartyToParticipantResponse
)
while not transaction_in_store:
party_to_participant_proposals: (
topology_manager_read_service_pb2.ListPartyToParticipantResponse
) = topology_read_client.ListPartyToParticipant(
topology_manager_read_service_pb2.ListPartyToParticipantRequest(
base_query=topology_manager_read_service_pb2.BaseQuery(
store=common_pb2.StoreId(
synchronizer=common_pb2.StoreId.Synchronizer(
id=synchronizer_id,
),
),
proposals=True,
head_state=empty_pb2.Empty(),
),
filter_party=party_id,
)
)
if len(party_to_participant_proposals.results) > 0:
break
else:
time.sleep(0.5)
continue
# Expecting a single pending proposal for the party
party_to_participant_proposal: (
topology_manager_read_service_pb2.ListPartyToParticipantResponse.Result
) = party_to_participant_proposals.results[0]
if not auto_accept:
print(MessageToJson(party_to_participant_proposal))
user_input = input("Authorize party hosting? (y/n): ")
if user_input.lower() != "y":
print("Transaction rejected.")
sys.exit(0)
# Authorize the hosting
topology_write_client.Authorize(
topology_manager_write_service_pb2.AuthorizeRequest(
transaction_hash=party_to_participant_proposal.context.transaction_hash.hex(),
must_fully_authorize=False,
store=common_pb2.StoreId(
synchronizer=common_pb2.StoreId.Synchronizer(
id=synchronizer_id,
),
),
)
)
For a complete example demonstrating multi-party hosting, check out this file:
Multi-Hosted External party example¶
# Copyright (c) 2025 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
import time
import json
import argparse
from grpc import Channel
from google.protobuf.json_format import MessageToJson
import grpc
from google.protobuf import empty_pb2
from com.digitalasset.canton.topology.admin.v30 import (
topology_manager_write_service_pb2_grpc,
)
from com.digitalasset.canton.topology.admin.v30 import (
topology_manager_write_service_pb2,
)
from com.digitalasset.canton.topology.admin.v30 import (
topology_manager_read_service_pb2_grpc,
)
from com.digitalasset.canton.topology.admin.v30 import (
topology_manager_read_service_pb2,
common_pb2,
)
from com.digitalasset.canton.admin.participant.v30 import (
participant_status_service_pb2,
participant_status_service_pb2_grpc,
)
from external_party_onboarding import (
onboard_external_party,
wait_to_observe_party_to_participant,
)
# Authorize an external party hosting on a participant node
def authorize_external_party_hosting(
party_id: str,
synchronizer_id: str,
channel: Channel,
auto_accept: bool,
):
"""
Authorizes the hosting of a multi-hosted external party on the current node.
Expects the PartyToParticipant proposal to have already been published to the synchronizer.
Args:
party_id (str): ID of the party.
synchronizer_id (str): ID of the synchronizer on which the party will be registered.
channel (grpc.Channel): gRPC channel to the confirming participant Admin API.
auto_accept (bool): Will not ask for confirmation when true.
"""
print(f"Authorizing hosting of {party_id}")
topology_write_client = (
topology_manager_write_service_pb2_grpc.TopologyManagerWriteServiceStub(channel)
)
topology_read_client = (
topology_manager_read_service_pb2_grpc.TopologyManagerReadServiceStub(channel)
)
# Retrieve the pending proposal
transaction_in_store = False
party_to_participant_proposals: (
topology_manager_read_service_pb2.ListPartyToParticipantResponse
)
while not transaction_in_store:
party_to_participant_proposals: (
topology_manager_read_service_pb2.ListPartyToParticipantResponse
) = topology_read_client.ListPartyToParticipant(
topology_manager_read_service_pb2.ListPartyToParticipantRequest(
base_query=topology_manager_read_service_pb2.BaseQuery(
store=common_pb2.StoreId(
synchronizer=common_pb2.StoreId.Synchronizer(
id=synchronizer_id,
),
),
proposals=True,
head_state=empty_pb2.Empty(),
),
filter_party=party_id,
)
)
if len(party_to_participant_proposals.results) > 0:
break
else:
time.sleep(0.5)
continue
# Expecting a single pending proposal for the party
party_to_participant_proposal: (
topology_manager_read_service_pb2.ListPartyToParticipantResponse.Result
) = party_to_participant_proposals.results[0]
if not auto_accept:
print(MessageToJson(party_to_participant_proposal))
user_input = input("Authorize party hosting? (y/n): ")
if user_input.lower() != "y":
print("Transaction rejected.")
sys.exit(0)
# Authorize the hosting
topology_write_client.Authorize(
topology_manager_write_service_pb2.AuthorizeRequest(
transaction_hash=party_to_participant_proposal.context.transaction_hash.hex(),
must_fully_authorize=False,
store=common_pb2.StoreId(
synchronizer=common_pb2.StoreId.Synchronizer(
id=synchronizer_id,
),
),
)
)
def get_participant_id(channel: grpc.Channel) -> str:
status_service_client = (
participant_status_service_pb2_grpc.ParticipantStatusServiceStub(channel)
)
status_response: participant_status_service_pb2.ParticipantStatusResponse = (
status_service_client.ParticipantStatus(
participant_status_service_pb2.ParticipantStatusRequest()
)
)
print(f"Participant ID = {status_response.status.common_status.uid}")
return status_response.status.common_status.uid
def multi_host_party(
party_name: str,
synchronizer_id: str,
confirming_threshold: int,
participant_data: object,
auto_accept: bool,
):
"""
Onboard a multi hosted party.
Args:
party_name (str): Name of the party.
synchronizer_id (str): ID of the synchronizer on which the party will be registered.
confirming_threshold (int): Minimum number of confirmations that must be received from the confirming participants to authorize a transaction.
participant_data (object): Mapping of participant ID to endpoint of their admin API.
auto_accept (bool): Will not ask for confirmation when true.
"""
print(f"Authorizing hosting of {party_name}")
participant_names = list(participant_data.keys())
if not participant_names:
raise ValueError("No participants provided in the participant data.")
channels = {}
participant_ids = []
for participant_name in participant_names:
channels[participant_name] = grpc.insecure_channel(
participant_data[participant_name]
)
# Get the participant id from each participant
participant_ids = participant_ids + [
get_participant_id(channels[participant_name])
]
(_, party_namespace) = onboard_external_party(
party_name,
participant_ids,
confirming_threshold,
synchronizer_id,
# Pick one of the participants to do the initial external party onboarding
channels[participant_names[0]],
)
party_id = party_name + "::" + party_namespace
# Authorize hosting for each additional confirming participant
# In reality this wouldn't be done from a central place like here but every hosting participant validator
# would run this on their own node
for additional_participant_name in participant_names[1:]:
authorize_external_party_hosting(
party_id,
synchronizer_id,
channels[additional_participant_name],
auto_accept,
)
# Wait for the party to appear in topology for all participants
for _, channel in channels.items():
with channel:
topology_read_client = (
topology_manager_read_service_pb2_grpc.TopologyManagerReadServiceStub(
channel
)
)
wait_to_observe_party_to_participant(
topology_read_client, synchronizer_id, party_id
)
print(f"Multi-Hosted party {party_id} fully onboarded")
def read_id_from_file(file_path):
try:
with open(file_path, "r") as file:
return file.read().strip()
except FileNotFoundError:
return None
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Multi-Hosted external party")
parser.add_argument(
"--participant-endpoints",
type=str,
help="Path to JSON file containing participant IDs and their endpoints (address + port)",
)
parser.add_argument(
"--synchronizer-id",
type=str,
help="Synchronizer ID",
default=read_id_from_file("synchronizer_id"),
)
parser.add_argument(
"--party-name",
type=str,
help="Party name",
)
parser.add_argument(
"--threshold",
type=int,
help="Confirmation threshold",
)
parser.add_argument(
"--auto-accept",
"-a",
help="Authorize party hosting without explicit confirmation",
action="store_true",
)
args = parser.parse_args()
if args.participant_endpoints:
try:
with open(args.participant_endpoints, "r") as f:
participant_data_raw = json.load(f)
# Extract only the adminApi port and hardcode the address to localhost
participant_data = {
# In this demo we assume all hosting participants are running on localhost
participant_id: f"localhost:{details['adminApi']}"
for participant_id, details in participant_data_raw.items()
if details.get("adminApi") is not None
}
multi_host_party(
args.party_name,
args.synchronizer_id,
args.threshold,
participant_data,
args.auto_accept,
)
except FileNotFoundError:
print(f"File {args.participant_endpoints} not found.")
except json.JSONDecodeError:
print(f"Failed to decode JSON file {args.participant_endpoints}.")
else:
parser.print_help()