Module admin_multisig

Expand source code
import smartpy as sp

"""
    A Multisig Contract used to administrate other contracts.
    THIS CONTRACT IS FOR ILLUSTRATIVE PURPOSE.
    IT HAS NOT BEEN AUDITED.
"""

class MS_TYPES:
    #############################################
    #           == Multisig Types ==            #
    #                                           #
    # The types in this file are shared with    #
    # contracts interacting with the multisig   #
    # contract.                                 #
    #############################################

    # Internal administration action type specification
    InternalAdminAction = sp.TVariant(
        changeSigners   = sp.TVariant(
                            removed = sp.TSet(sp.TAddress),
                            added   = sp.TList(
                                        sp.TRecord(
                                            address     = sp.TAddress,
                                            publicKey   = sp.TKey
                                        ).right_comb()
                                    )
                        ).right_comb(),
        changeQuorum    = sp.TNat,
        changeMetadata  = sp.TPair(sp.TString, sp.TOption(sp.TBytes)),
    ).right_comb()

    # External administration action type specification
    ExternalAdminAction = sp.TRecord(
        target  = sp.TAddress,
        actions = sp.TBytes
    ).right_comb()

    # Proposal action type specification
    ProposalAction = sp.TVariant(
        internal = sp.TList(InternalAdminAction),
        external = sp.TList(ExternalAdminAction)
    ).right_comb()

    # Proposal type specification
    Proposal = sp.TRecord(
        startedAt       = sp.TTimestamp,
        initiator       = sp.TAddress,
        endorsements    = sp.TSet(sp.TAddress),
        actions         = ProposalAction
    ).right_comb()

    AggregatedProposalParams = sp.TRecord(
        signatures      = sp.TList(
                            sp.TRecord(
                                signerAddress   = sp.TAddress,
                                signature       = sp.TSignature
                            ).right_comb()
                        ),
        proposalId      = sp.TNat,
        actions         = ProposalAction
    ).right_comb()

    AggregatedEndorsementParams = sp.TList(
        sp.TRecord(
            signatures      = sp.TList(
                                sp.TRecord(
                                    signerAddress   = sp.TAddress,
                                    signature       = sp.TSignature
                                ).right_comb()
                            ),
            proposalId      = sp.TNat
        ).right_comb()
    )

#####################################
# + Metadata                        #
#####################################
METADATA = {
    "name"          : "Generic Multisig Administrator",
    "version"       : "1",
    "description"   : "Generic Multisig Administrator",
    "source"        : {
        "tools": [ "SmartPy" ]
    },
    "interfaces"    : [ "TZIP-016" ],
}

#####################################
# + Error Messages                  #
#####################################
class ERR:
    def make(s): return ("MULTISIG_" + s)

    Badsig                 = make("Badsig")
    ProposalUnknown        = make("ProposalUnknown")
    NotInitiator           = make("NotInitiator")
    SignerUnknown          = make("SignerUnknown")
    InvalidTarget          = make("InvalidTarget")
    MoreQuorumThanSigners  = make("MoreQuorumThanSigners")
    InvalidProposalId      = make("InvalidProposalId")

#####################################
# + Helpers                         #
#####################################
class MultisigHelpers:
    def failIfNotSigner(self, address):
        sp.verify(self.data.signers.contains(address), message = ERR.SignerUnknown)

    def failIfProposalNotActive(self, proposalId):
        sp.verify(self.data.activeProposals.contains(proposalId), message = ERR.ProposalUnknown)

#####################################
# + Contract                        #
#####################################
class MultisigAdmin(sp.Contract, MultisigHelpers):
    def __init__(
            self,
            quorum,
            signers,
            metadataURL
        ):

        # Metadata helper
        self.init_metadata("metadata", METADATA)

        self.init_type(
            sp.TRecord(
                quorum          = sp.TNat,
                lastProposalId  = sp.TNat,
                signers         = sp.TMap(
                                    sp.TAddress,
                                    sp.TRecord(
                                        publicKey       = sp.TKey,
                                        lastProposalId  = sp.TOption(sp.TNat)
                                    ).right_comb()
                                ),
                proposals       = sp.TBigMap(sp.TNat, MS_TYPES.Proposal),
                activeProposals = sp.TSet(sp.TNat),
                metadata        = sp.TBigMap(sp.TString, sp.TBytes),
            ).right_comb()
        )
        self.init(
            quorum              = quorum,
            lastProposalId      = 0,
            signers             = signers,
            proposals           = sp.big_map(),
            activeProposals     = sp.set(),
            metadata            = sp.utils.metadata_of_url(metadataURL)
        )

    #####################################
    # ++ ENTRYPOINTS                    #
    #####################################

    @sp.entrypoint
    def proposal(self, actions):
        """
            Each user can have at most one proposal active at a time.
            Submitting a new proposal overrides the previous one.
        """
        # Proposals can only be submitted by registered signers
        self.failIfNotSigner(sp.sender)

        # If the proposal initiator has an active proposal,
        # then replace that proposal with the new one
        signerLastProposalId = self.data.signers[sp.sender].lastProposalId
        with sp.if_(signerLastProposalId != sp.none):
            self.data.activeProposals.remove(signerLastProposalId.open_some())

        # Increment proposal counter
        self.data.lastProposalId += 1
        proposalId = self.data.lastProposalId
        # Store new proposal
        self.data.activeProposals.add(proposalId)
        self.data.proposals[proposalId] = sp.record(
            startedAt       = sp.now,
            initiator       = sp.sender,
            endorsements    = sp.set([sp.sender]),
            actions         = actions
        )
        # Update signer's last proposal
        self.data.signers[sp.sender].lastProposalId = sp.some(proposalId)

        # Approve the proposal if quorum only requires 1 vote
        with sp.if_(self.data.quorum < 2):
            self.onApproved(
                sp.record(
                    proposalId  = proposalId,
                    actions     = actions,
                )
            )

    @sp.entrypoint
    def endorsement(self, endorsements):
        """
            Entrypoint used to submit endorsements to single/multiple proposals.
        """
        # Endorsements can only be submitted by registered signers
        self.failIfNotSigner(sp.sender)

        # Iterate over every endorsement
        with sp.for_("pId", endorsements) as pId:
            self.registerEndorsement(
                sp.record(
                    proposalId      = pId,
                    signerAddress   = sp.sender
                )
            )

            # Approve the proposal if quorum was reached
            proposal = self.data.proposals[pId]
            with sp.if_(sp.len(proposal.endorsements) >= self.data.quorum):
                self.onApproved(
                    sp.record(
                        proposalId  = pId,
                        actions     = proposal.actions,
                    )
                )

    @sp.entrypoint
    def aggregated_proposal(self, params):
        """
            Users can send aggregated proposal, which are signed offchain and validated onchain.
        """
        sp.set_type(params, MS_TYPES.AggregatedProposalParams)
        self.failIfNotSigner(sp.sender)

        self.data.lastProposalId += 1
        sp.verify(self.data.lastProposalId == params.proposalId, message = ERR.InvalidProposalId)

        proposal        = sp.compute(
                            sp.record(
                                startedAt       = sp.now,
                                initiator       = sp.sender,
                                endorsements    = sp.set([sp.sender]),
                                actions         = params.actions
                            )
                        )

        # If the proposal initiator has an active proposal,
        # then replace that proposal with the new one
        proposerLastProposalId = self.data.signers[sp.sender].lastProposalId
        with sp.if_(proposerLastProposalId != sp.none):
            self.data.activeProposals.remove(proposerLastProposalId.open_some())
        self.data.signers[sp.sender].lastProposalId = sp.some(params.proposalId)

        self.data.activeProposals.add(params.proposalId)
        self.data.proposals[params.proposalId] = proposal

        preSignature    = sp.compute(
                            sp.pack(
                                sp.record(
                                    actions         = params.actions,
                                    # (contractAddress + proposalId) protect against replay attacks
                                    proposalId      = params.proposalId,
                                    contractAddress = sp.self_address,
                                )
                            )
                        )

        # Validate and apply endorsements
        with sp.for_("signature", params.signatures) as signature:
            self.failIfNotSigner(signature.signerAddress)

            publicKey = self.data.signers[signature.signerAddress].publicKey
            sp.verify(sp.check_signature(publicKey, signature.signature, preSignature), message = ERR.Badsig)

            proposal.endorsements.add(signature.signerAddress)

        # Check quorum
        with sp.if_(sp.len(proposal.endorsements) >= self.data.quorum):
            self.onApproved(
                sp.record(
                    proposalId  = params.proposalId,
                    actions     = proposal.actions,
                )
            )

    @sp.entrypoint
    def aggregated_endorsement(self, endorsements):
        """
            Users can send aggregated votes, which are signed offchain and validated onchain.
        """
        sp.set_type(endorsements, MS_TYPES.AggregatedEndorsementParams)

        with sp.for_("endorsment", endorsements) as endorsement:
            with sp.for_("signature", endorsement.signatures) as signature:
                self.failIfNotSigner(signature.signerAddress)
                preSignature = sp.pack(
                    sp.record(
                        # (contractAddress + proposalId) protect against replay attacks
                        contractAddress = sp.self_address,
                        proposalId      = endorsement.proposalId
                    )
                )
                publicKey = self.data.signers[signature.signerAddress].publicKey
                sp.verify(
                    sp.check_signature(publicKey, signature.signature, preSignature),
                    message = ERR.Badsig
                )
                self.registerEndorsement(
                    sp.record(
                        proposalId      = endorsement.proposalId,
                        signerAddress   = signature.signerAddress
                    )
                )
            proposal = sp.compute(self.data.proposals[endorsement.proposalId])
            with sp.if_(sp.len(proposal.endorsements) >= self.data.quorum):
                self.onApproved(
                    sp.record(
                        proposalId  = endorsement.proposalId,
                        actions     = proposal.actions,
                    )
                )

    @sp.entrypoint
    def cancel_proposal(self, proposalId):
        self.failIfNotSigner(sp.sender)

        # Signers can only cancel their own proposals
        sp.verify(self.data.proposals[proposalId].initiator == sp.sender, message = ERR.NotInitiator)
        self.data.activeProposals.remove(proposalId)

    #####################################
    # ++ GLOBAL LAMBDAS                 #
    #####################################

    @sp.private_lambda(with_storage="read-write", wrap_call=True)
    def registerEndorsement(self, params):
        self.failIfProposalNotActive(params.proposalId)
        # Add endorsement to proposal
        self.data.proposals[params.proposalId].endorsements.add(params.signerAddress)

    @sp.private_lambda(with_storage="read-write", with_operations=True, wrap_call=True)
    def onApproved(self, params):
        with (params.actions).match_cases() as arg:
            # Internal actions are applied to the multisig contract
            with arg.match("internal") as internalActions:
                with sp.for_("action", internalActions) as action:
                    self.selfAdmin(action)
                # Removes all active proposals after an administrative change.
                self.removeActiveProposals()
            # External actions are applied to other contracts
            with arg.match("external") as externalActions:
                with sp.for_("action", externalActions) as action:
                    target = sp.contract(sp.TBytes, action.target).open_some(ERR.InvalidTarget)
                    sp.transfer(action.actions, sp.tez(0), target)

        self.data.activeProposals.remove(params.proposalId)

    def selfAdmin(self, action):
        """
            Apply administrative actions to the multisig contract
        """
        with (action).match_cases() as arg:
            with arg.match('changeQuorum') as quorum:
                self.data.quorum = quorum
            with arg.match('changeMetadata') as metadata:
                k, v = sp.match_pair(metadata)
                with sp.if_(v.is_some()):
                    self.data.metadata[k] = v.open_some()
                with sp.else_():
                    del self.data.metadata[k]

            with arg.match('changeSigners') as changeSigners:
                with (changeSigners).match_cases() as cvAction:
                    with cvAction.match('removed') as removeSet:
                        with sp.for_("address", removeSet.elements()) as address:
                            with sp.if_(self.data.signers.contains(address)):
                                # Remove signer
                                del self.data.signers[address]
                                # We don't remove signer[address].lastProposalId
                                # because we remove all activeProposals after it.
                    with cvAction.match('added') as addList:
                        with sp.for_("signer", addList) as signer:
                            self.data.signers[signer.address]   = sp.record(
                                                                    publicKey       = signer.publicKey,
                                                                    lastProposalId  = sp.none
                                                                )
        # Ensure that the contract never requires more quorum than the total of signers.
        sp.verify(self.data.quorum <= sp.len(self.data.signers), message = ERR.MoreQuorumThanSigners)

    def removeActiveProposals(self):
        """
            This method removes all active proposals when either the quorum or the signers get updated.
        """
        self.data.activeProposals = sp.set([])


sp.add_compilation_target(
    "multisig_admin",
    MultisigAdmin(
        quorum              = 1,
        signers             = sp.map(
                                {
                                    sp.address("KT1_SIGNER1_ADDRESS") : sp.record(
                                                                            publicKey       = sp.key("KT1_SIGNER1_KEY"),
                                                                            lastProposalId  = sp.none
                                                                        )
                                },
                                tkey    = sp.TAddress,
                                tvalue  = sp.TRecord(
                                            publicKey       = sp.TKey,
                                            lastProposalId  = sp.TOption(sp.TNat)
                                        ).right_comb()
                            ),
        metadataURL         = "ipfs://"
    )
)

if "templates" not in __name__:


    #########
    # Helpers

    class InternalHelper():
        def variant(content):
            return sp.variant("internal", content)

        def changeQuorum(quorum):
            return sp.variant("changeQuorum", quorum)

        def removeSigners(l):
            return sp.variant("changeSigners",
                sp.variant("removed", sp.set(l))
            )

        def addSigners(l):
            added_list = []
            for added_info in l:
                addr, publicKey = added_info
                added_list.append(
                    sp.record(
                        address = addr,
                        publicKey = publicKey)
                    )
            return sp.variant("changeSigners",
                sp.variant("added", sp.list(added_list))
            )

    class ExternalHelper():
        def variant(content):
            return sp.variant("external", content)

        def changeActive(active):
            return sp.variant("changeActive", active)

        def changeAdmin(address):
            return sp.variant("changeAdmin", address)

    def sign(account, contract):
        message = sp.pack(
            sp.record(
                contractAddress = contract.address,
                proposalId      = contract.data.lastProposalId
            )
        )
        signature = sp.make_signature(account.secret_key, message, message_format = 'Raw')
        vote = sp.record(
            signerAddress   = account.address,
            signature       = signature
        )
        return vote

    ################
    # Test contract

    class Administrated(sp.Contract):
        """
            This contract is a sample
            It shows how a contract can be administrated
            through the multisig administration contract
        """
        def __init__(self, admin, active):
            self.init(
                admin = admin,
                active = active
            )

        AdministrationType = sp.TVariant(
            changeAdmin    = sp.TAddress,
            changeActive   = sp.TBool
        )

        @sp.entrypoint
        def administrate(self, actionsBytes):
            sp.verify(sp.sender == self.data.admin, message = "NOT ADMIN")

            # actionsBytes is packed and must be unpacked
            actions = sp.unpack(actionsBytes, sp.TList(Administrated.AdministrationType)).open_some(message = "Actions are invalid")

            with sp.for_("action", actions) as action:
                with (action).match_cases() as arg:
                    with arg.match('changeActive') as active:
                        self.data.active = active
                    with arg.match('changeAdmin') as admin:
                        self.data.admin = admin

        @sp.entrypoint
        def verifyActive(self):
            sp.verify(self.data.active, message = "NOT ACTIVE")

        # Helpers

        def packActions(self, actions):
            actions = sp.set_type_expr(actions, sp.TList(Administrated.AdministrationType))
            return sp.pack(actions)


    def add_test(internal_tests, is_default = True):
        name = "Internal Administration tests" if internal_tests else "External Administration tests"
        @sp.add_test(name = name, is_default = is_default)
        def test():
            sc = sp.test_scenario()
            sc.h1(name)
            sc.table_of_contents()

            admin = sp.test_account("admin")
            signer1 = sp.test_account("signer1")
            signer2 = sp.test_account("signer2")
            signer3 = sp.test_account("signer3")
            signer4 = sp.test_account("signer4")

            if internal_tests:

                sc.h3("Originate Multisig Admin")
                multisigAdmin = MultisigAdmin(
                    quorum              = 1,
                    signers             = sp.map(
                                            {
                                                signer1.address : sp.record(
                                                                    publicKey       = signer1.public_key,
                                                                    lastProposalId  = sp.none
                                                                ),
                                                signer2.address : sp.record(
                                                                    publicKey       = signer2.public_key,
                                                                    lastProposalId  = sp.none
                                                                )
                                            }
                                        ),
                    metadataURL         = "ipfs://"
                )
                sc += multisigAdmin

                ##########################
                # Auto-accepted proposal #
                ##########################
                sc.h2("Auto-accepted proposal when quorum is 1")
                sc.h3("signer1 propose to change quorum to 2")
                sc.verify(multisigAdmin.data.quorum == 1)
                changeQuorum = InternalHelper.changeQuorum(2)
                sc += multisigAdmin.proposal(InternalHelper.variant([changeQuorum])).run(sender = signer1)
                sc.verify(multisigAdmin.data.quorum == 2)

                ####################
                # Add a 3rd signer #
                ####################
                sc.h2("Adding a 3rd signer")
                sc.h3("signer2 new proposal to include signer3")
                sc.verify(sp.len(multisigAdmin.data.signers) == 2)
                sc.verify(~multisigAdmin.data.signers.contains(signer3.address))
                changeSigners = InternalHelper.addSigners([(signer3.address, signer3.public_key)])
                sc += multisigAdmin.proposal(InternalHelper.variant([changeSigners])).run(sender = signer2)
                sc.h3("signer1 votes the proposal")
                sc += multisigAdmin.endorsement([multisigAdmin.data.lastProposalId]).run(sender = signer1)
                sc.verify(multisigAdmin.data.signers.contains(signer3.address))
                sc.verify(sp.len(multisigAdmin.data.signers) == 3)

                ############################################
                # New proposal (change Quorum from 2 to 3) #
                ############################################
                sc.h2("New proposal (change Quorum from 2 to 3)")
                sc.h3("signer1 new proposal to change quorum to 3")
                changeQuorum = InternalHelper.changeQuorum(3)
                sc += multisigAdmin.proposal(InternalHelper.variant([changeQuorum])).run(sender = signer1)
                # Proposal has not been validated yet
                sc.verify(multisigAdmin.data.quorum == 2)
                sc.h3("signer2 votes the proposal (2/2)")
                sc += multisigAdmin.endorsement([multisigAdmin.data.lastProposalId]).run(sender = signer2)
                sc.verify(multisigAdmin.data.quorum == 3)

                ###########################################
                # Newly included signer starts a proposal #
                ###########################################
                sc.h2("Newly included signer starts a proposal")
                sc.h3("New proposal by signer 3 to decrease quorum to 2")
                changeQuorum = InternalHelper.changeQuorum(2)
                sc += multisigAdmin.proposal(InternalHelper.variant([changeQuorum])).run(sender = signer3)
                sc.h3("signer1 votes the proposal")
                sc += multisigAdmin.endorsement([multisigAdmin.data.lastProposalId]).run(sender = signer1)
                sc.verify(multisigAdmin.data.quorum == 3)
                sc.h3("signer2 votes the proposal")
                sc += multisigAdmin.endorsement([multisigAdmin.data.lastProposalId]).run(sender = signer2)
                sc.verify(multisigAdmin.data.quorum == 2)

                ##########
                # Cancel #
                ##########
                sc.h2("Proposal cancellation")
                sc.h3("New proposal by signer 1")
                changeTimeout = InternalHelper.changeQuorum(3)
                sc += multisigAdmin.proposal(InternalHelper.variant([changeTimeout])).run(sender = signer1)
                sc.verify(sp.len(multisigAdmin.data.activeProposals) == 1)
                sc.h3("Signer 2 tries to cancel the proposal (must fail, only the initiator can cancel)")
                sc += multisigAdmin.cancel_proposal(multisigAdmin.data.lastProposalId).run(sender = signer2, valid = False)
                sc.h3("Signer 1 cancels the proposal")
                sc += multisigAdmin.cancel_proposal(multisigAdmin.data.lastProposalId).run(sender = signer1)
                sc.verify(sp.len(multisigAdmin.data.activeProposals) == 0)
                sc.h3("Signer 2 tries to vote the canceled proposal")
                sc += multisigAdmin.endorsement([multisigAdmin.data.lastProposalId]).run(sender = signer2, valid = False)
                sc.verify(multisigAdmin.data.quorum != 3)

                ######################
                # 2 actions proposal #
                ######################
                sc.h2("2 actions proposal")
                sc.h3("Signer 1 new proposal: change quorum to 2 and add signer 4")
                sc.verify(~multisigAdmin.data.signers.contains(signer4.address))
                changeQuorum = InternalHelper.changeQuorum(3)
                changeSigners = InternalHelper.addSigners([(signer4.address, signer4.public_key)])
                sc += multisigAdmin.proposal(InternalHelper.variant([changeQuorum, changeSigners])).run(sender = signer1)
                sc.h3("Signer 2 votes the proposal")
                sc += multisigAdmin.endorsement([multisigAdmin.data.lastProposalId]).run(sender = signer2)
                sc.verify(multisigAdmin.data.quorum == 3)
                sc.verify(multisigAdmin.data.signers.contains(signer4.address))

                #########################################
                # 2 Internal proposals at the same time #
                #########################################
                sc.h3("Signer 1 new proposal: change quorum to 2 and remove signer 4")
                changeQuorum = InternalHelper.changeQuorum(2)
                sc += multisigAdmin.proposal(InternalHelper.variant([changeQuorum])).run(sender = signer1)
                changeSigners = InternalHelper.removeSigners([signer4.address])
                sc += multisigAdmin.proposal(InternalHelper.variant([changeSigners])).run(sender = signer2)
                sc.verify(sp.len(multisigAdmin.data.activeProposals) == 2)
                sc.h3("Signer 3 votes on quorum proposal")
                sc += multisigAdmin.endorsement([sp.as_nat(multisigAdmin.data.lastProposalId - 1)]).run(sender = signer3)
                sc.h3("Signer 4 votes on signers proposal")
                sc += multisigAdmin.endorsement([multisigAdmin.data.lastProposalId]).run(sender = signer4)
                sc.h3("Confirm that nothing has changed")
                sc.verify(multisigAdmin.data.quorum == 3)
                sc.verify(multisigAdmin.data.signers.contains(signer4.address))
                sc.h3("Signer 4 votes on quorum proposal")
                sc += multisigAdmin.endorsement([sp.as_nat(multisigAdmin.data.lastProposalId - 1)]).run(sender = signer4)
                sc.h3("Confirm that quorum was updated and signers proposal was canceled")
                sc.verify(sp.len(multisigAdmin.data.activeProposals) == 0)
                sc.verify(multisigAdmin.data.quorum == 2)
                sc.verify(multisigAdmin.data.signers.contains(signer4.address))

                #########################
                # Multisig endorsements #
                #########################
                sc.h2("Multi vote in one call")
                sc.h3("Signer 1 new proposal")
                changeQuorum = InternalHelper.changeQuorum(3)
                sc += multisigAdmin.proposal(InternalHelper.variant([changeQuorum])).run(sender = signer1)
                sc.h3("Signer 2 and Signer 3 votes are pushed by Signer 1")
                signer2_endorsement = sign(signer2, contract = multisigAdmin)
                signer3_endorsement = sign(signer3, contract = multisigAdmin)
                proposalEndorsements = sp.record(
                    proposalId = multisigAdmin.data.lastProposalId,
                    signatures = [signer2_endorsement, signer3_endorsement]
                )
                sc += multisigAdmin.aggregated_endorsement([proposalEndorsements]).run(sender = signer1)
                sc.verify(multisigAdmin.data.quorum == 3)

                #####################
                # Multisig proposal #
                #####################
                sc.h2("Multi vote in one call")
                sc.h3("Signer 1 new proposal")
                changeQuorum = InternalHelper.changeQuorum(3)
                sc += multisigAdmin.proposal(InternalHelper.variant([changeQuorum])).run(sender = signer1)
                sc.h3("Signer 2 and Signer 3 votes are pushed by Signer 1")
                signer2_vote = sign(signer2, contract = multisigAdmin)
                signer3_vote = sign(signer3, contract = multisigAdmin)
                proposalVotes = sp.record(
                    proposalId = multisigAdmin.data.lastProposalId,
                    signatures = [signer2_vote, signer3_vote]
                )
                sc += multisigAdmin.aggregated_endorsement([proposalVotes]).run(sender = signer1)
                sc.verify(multisigAdmin.data.quorum == 3)

            ##########################################

            else:

                sc.h3("Originate Multisig Admin")
                multisigAdmin = MultisigAdmin(
                    quorum              = 3,
                    signers             = sp.map(
                                            {
                                                signer1.address : sp.record(
                                                                    publicKey       = signer1.public_key,
                                                                    lastProposalId  = sp.none
                                                                ),
                                                signer2.address : sp.record(
                                                                    publicKey       = signer2.public_key,
                                                                    lastProposalId  = sp.none
                                                                ),
                                                signer3.address : sp.record(
                                                                    publicKey       = signer3.public_key,
                                                                    lastProposalId  = sp.none
                                                                )
                                            }
                                        ),
                    metadataURL         = "ipfs://"
                )
                sc += multisigAdmin

                sc.h3("Originate administrated contract")
                administrated = Administrated(admin.address, False)
                sc += administrated
                administrated_entrypoint = administrated.typed.administrate

                sc.h2("Set multisig as admin of administrated contract")
                sc.verify(administrated.data.active == False)
                sc.verify(administrated.data.admin == admin.address)
                actions = administrated.packActions([ExternalHelper.changeAdmin(multisigAdmin.address)])
                sc += administrated.administrate(actions).run(sender = admin)
                sc.verify(administrated.data.active == False)
                sc.verify(administrated.data.admin == multisigAdmin.address)

                sc.h2("Activate the administrated contract")
                sc.h3("Signer 1 new proposal: changeActive")
                actions = administrated.packActions([ExternalHelper.changeActive(True)])
                sc += multisigAdmin.proposal(ExternalHelper.variant(
                    [sp.record(
                        target  = sp.to_address(administrated_entrypoint),
                        actions = actions
                    )]
                )).run(sender = signer1)
                sc.verify(administrated.data.active == False)
                sc.h3("Signer 2 votes")
                sc += multisigAdmin.endorsement([multisigAdmin.data.lastProposalId]).run(sender = signer2)
                sc.verify(administrated.data.active == False)
                sc.h3("Signer 3 votes")
                sc += multisigAdmin.endorsement([multisigAdmin.data.lastProposalId]).run(sender = signer3)
                sc.verify(administrated.data.active == True)

                sc.h2("Use Multisig vote to deactivate the administrated contract")
                sc.h3("Signer 1 new proposal: changeActive")
                actions = administrated.packActions([ExternalHelper.changeActive(False)])
                sc += multisigAdmin.proposal(ExternalHelper.variant(
                    [sp.record(
                        target  = sp.to_address(administrated_entrypoint),
                        actions = actions
                    )]
                )).run(sender = signer1)
                sc.verify(administrated.data.active == True)
                sc.h3("Signer 2 and Signer 3 votes are pushed by Signer 1")
                signer2_vote = sign(signer2, contract = multisigAdmin)
                signer3_vote = sign(signer3, contract = multisigAdmin)
                proposalVotes = sp.record(
                    proposalId = multisigAdmin.data.lastProposalId,
                    signatures = [signer2_vote, signer3_vote]
                )
                sc += multisigAdmin.aggregated_endorsement([proposalVotes]).run(sender = signer1)
                sc.verify(administrated.data.active == False)

    add_test(internal_tests = True)
    add_test(internal_tests = False)

Functions

def sign(account, contract)
Expand source code
def sign(account, contract):
    message = sp.pack(
        sp.record(
            contractAddress = contract.address,
            proposalId      = contract.data.lastProposalId
        )
    )
    signature = sp.make_signature(account.secret_key, message, message_format = 'Raw')
    vote = sp.record(
        signerAddress   = account.address,
        signature       = signature
    )
    return vote
def add_test(internal_tests, is_default=True)
Expand source code
def add_test(internal_tests, is_default = True):
    name = "Internal Administration tests" if internal_tests else "External Administration tests"
    @sp.add_test(name = name, is_default = is_default)
    def test():
        sc = sp.test_scenario()
        sc.h1(name)
        sc.table_of_contents()

        admin = sp.test_account("admin")
        signer1 = sp.test_account("signer1")
        signer2 = sp.test_account("signer2")
        signer3 = sp.test_account("signer3")
        signer4 = sp.test_account("signer4")

        if internal_tests:

            sc.h3("Originate Multisig Admin")
            multisigAdmin = MultisigAdmin(
                quorum              = 1,
                signers             = sp.map(
                                        {
                                            signer1.address : sp.record(
                                                                publicKey       = signer1.public_key,
                                                                lastProposalId  = sp.none
                                                            ),
                                            signer2.address : sp.record(
                                                                publicKey       = signer2.public_key,
                                                                lastProposalId  = sp.none
                                                            )
                                        }
                                    ),
                metadataURL         = "ipfs://"
            )
            sc += multisigAdmin

            ##########################
            # Auto-accepted proposal #
            ##########################
            sc.h2("Auto-accepted proposal when quorum is 1")
            sc.h3("signer1 propose to change quorum to 2")
            sc.verify(multisigAdmin.data.quorum == 1)
            changeQuorum = InternalHelper.changeQuorum(2)
            sc += multisigAdmin.proposal(InternalHelper.variant([changeQuorum])).run(sender = signer1)
            sc.verify(multisigAdmin.data.quorum == 2)

            ####################
            # Add a 3rd signer #
            ####################
            sc.h2("Adding a 3rd signer")
            sc.h3("signer2 new proposal to include signer3")
            sc.verify(sp.len(multisigAdmin.data.signers) == 2)
            sc.verify(~multisigAdmin.data.signers.contains(signer3.address))
            changeSigners = InternalHelper.addSigners([(signer3.address, signer3.public_key)])
            sc += multisigAdmin.proposal(InternalHelper.variant([changeSigners])).run(sender = signer2)
            sc.h3("signer1 votes the proposal")
            sc += multisigAdmin.endorsement([multisigAdmin.data.lastProposalId]).run(sender = signer1)
            sc.verify(multisigAdmin.data.signers.contains(signer3.address))
            sc.verify(sp.len(multisigAdmin.data.signers) == 3)

            ############################################
            # New proposal (change Quorum from 2 to 3) #
            ############################################
            sc.h2("New proposal (change Quorum from 2 to 3)")
            sc.h3("signer1 new proposal to change quorum to 3")
            changeQuorum = InternalHelper.changeQuorum(3)
            sc += multisigAdmin.proposal(InternalHelper.variant([changeQuorum])).run(sender = signer1)
            # Proposal has not been validated yet
            sc.verify(multisigAdmin.data.quorum == 2)
            sc.h3("signer2 votes the proposal (2/2)")
            sc += multisigAdmin.endorsement([multisigAdmin.data.lastProposalId]).run(sender = signer2)
            sc.verify(multisigAdmin.data.quorum == 3)

            ###########################################
            # Newly included signer starts a proposal #
            ###########################################
            sc.h2("Newly included signer starts a proposal")
            sc.h3("New proposal by signer 3 to decrease quorum to 2")
            changeQuorum = InternalHelper.changeQuorum(2)
            sc += multisigAdmin.proposal(InternalHelper.variant([changeQuorum])).run(sender = signer3)
            sc.h3("signer1 votes the proposal")
            sc += multisigAdmin.endorsement([multisigAdmin.data.lastProposalId]).run(sender = signer1)
            sc.verify(multisigAdmin.data.quorum == 3)
            sc.h3("signer2 votes the proposal")
            sc += multisigAdmin.endorsement([multisigAdmin.data.lastProposalId]).run(sender = signer2)
            sc.verify(multisigAdmin.data.quorum == 2)

            ##########
            # Cancel #
            ##########
            sc.h2("Proposal cancellation")
            sc.h3("New proposal by signer 1")
            changeTimeout = InternalHelper.changeQuorum(3)
            sc += multisigAdmin.proposal(InternalHelper.variant([changeTimeout])).run(sender = signer1)
            sc.verify(sp.len(multisigAdmin.data.activeProposals) == 1)
            sc.h3("Signer 2 tries to cancel the proposal (must fail, only the initiator can cancel)")
            sc += multisigAdmin.cancel_proposal(multisigAdmin.data.lastProposalId).run(sender = signer2, valid = False)
            sc.h3("Signer 1 cancels the proposal")
            sc += multisigAdmin.cancel_proposal(multisigAdmin.data.lastProposalId).run(sender = signer1)
            sc.verify(sp.len(multisigAdmin.data.activeProposals) == 0)
            sc.h3("Signer 2 tries to vote the canceled proposal")
            sc += multisigAdmin.endorsement([multisigAdmin.data.lastProposalId]).run(sender = signer2, valid = False)
            sc.verify(multisigAdmin.data.quorum != 3)

            ######################
            # 2 actions proposal #
            ######################
            sc.h2("2 actions proposal")
            sc.h3("Signer 1 new proposal: change quorum to 2 and add signer 4")
            sc.verify(~multisigAdmin.data.signers.contains(signer4.address))
            changeQuorum = InternalHelper.changeQuorum(3)
            changeSigners = InternalHelper.addSigners([(signer4.address, signer4.public_key)])
            sc += multisigAdmin.proposal(InternalHelper.variant([changeQuorum, changeSigners])).run(sender = signer1)
            sc.h3("Signer 2 votes the proposal")
            sc += multisigAdmin.endorsement([multisigAdmin.data.lastProposalId]).run(sender = signer2)
            sc.verify(multisigAdmin.data.quorum == 3)
            sc.verify(multisigAdmin.data.signers.contains(signer4.address))

            #########################################
            # 2 Internal proposals at the same time #
            #########################################
            sc.h3("Signer 1 new proposal: change quorum to 2 and remove signer 4")
            changeQuorum = InternalHelper.changeQuorum(2)
            sc += multisigAdmin.proposal(InternalHelper.variant([changeQuorum])).run(sender = signer1)
            changeSigners = InternalHelper.removeSigners([signer4.address])
            sc += multisigAdmin.proposal(InternalHelper.variant([changeSigners])).run(sender = signer2)
            sc.verify(sp.len(multisigAdmin.data.activeProposals) == 2)
            sc.h3("Signer 3 votes on quorum proposal")
            sc += multisigAdmin.endorsement([sp.as_nat(multisigAdmin.data.lastProposalId - 1)]).run(sender = signer3)
            sc.h3("Signer 4 votes on signers proposal")
            sc += multisigAdmin.endorsement([multisigAdmin.data.lastProposalId]).run(sender = signer4)
            sc.h3("Confirm that nothing has changed")
            sc.verify(multisigAdmin.data.quorum == 3)
            sc.verify(multisigAdmin.data.signers.contains(signer4.address))
            sc.h3("Signer 4 votes on quorum proposal")
            sc += multisigAdmin.endorsement([sp.as_nat(multisigAdmin.data.lastProposalId - 1)]).run(sender = signer4)
            sc.h3("Confirm that quorum was updated and signers proposal was canceled")
            sc.verify(sp.len(multisigAdmin.data.activeProposals) == 0)
            sc.verify(multisigAdmin.data.quorum == 2)
            sc.verify(multisigAdmin.data.signers.contains(signer4.address))

            #########################
            # Multisig endorsements #
            #########################
            sc.h2("Multi vote in one call")
            sc.h3("Signer 1 new proposal")
            changeQuorum = InternalHelper.changeQuorum(3)
            sc += multisigAdmin.proposal(InternalHelper.variant([changeQuorum])).run(sender = signer1)
            sc.h3("Signer 2 and Signer 3 votes are pushed by Signer 1")
            signer2_endorsement = sign(signer2, contract = multisigAdmin)
            signer3_endorsement = sign(signer3, contract = multisigAdmin)
            proposalEndorsements = sp.record(
                proposalId = multisigAdmin.data.lastProposalId,
                signatures = [signer2_endorsement, signer3_endorsement]
            )
            sc += multisigAdmin.aggregated_endorsement([proposalEndorsements]).run(sender = signer1)
            sc.verify(multisigAdmin.data.quorum == 3)

            #####################
            # Multisig proposal #
            #####################
            sc.h2("Multi vote in one call")
            sc.h3("Signer 1 new proposal")
            changeQuorum = InternalHelper.changeQuorum(3)
            sc += multisigAdmin.proposal(InternalHelper.variant([changeQuorum])).run(sender = signer1)
            sc.h3("Signer 2 and Signer 3 votes are pushed by Signer 1")
            signer2_vote = sign(signer2, contract = multisigAdmin)
            signer3_vote = sign(signer3, contract = multisigAdmin)
            proposalVotes = sp.record(
                proposalId = multisigAdmin.data.lastProposalId,
                signatures = [signer2_vote, signer3_vote]
            )
            sc += multisigAdmin.aggregated_endorsement([proposalVotes]).run(sender = signer1)
            sc.verify(multisigAdmin.data.quorum == 3)

        ##########################################

        else:

            sc.h3("Originate Multisig Admin")
            multisigAdmin = MultisigAdmin(
                quorum              = 3,
                signers             = sp.map(
                                        {
                                            signer1.address : sp.record(
                                                                publicKey       = signer1.public_key,
                                                                lastProposalId  = sp.none
                                                            ),
                                            signer2.address : sp.record(
                                                                publicKey       = signer2.public_key,
                                                                lastProposalId  = sp.none
                                                            ),
                                            signer3.address : sp.record(
                                                                publicKey       = signer3.public_key,
                                                                lastProposalId  = sp.none
                                                            )
                                        }
                                    ),
                metadataURL         = "ipfs://"
            )
            sc += multisigAdmin

            sc.h3("Originate administrated contract")
            administrated = Administrated(admin.address, False)
            sc += administrated
            administrated_entrypoint = administrated.typed.administrate

            sc.h2("Set multisig as admin of administrated contract")
            sc.verify(administrated.data.active == False)
            sc.verify(administrated.data.admin == admin.address)
            actions = administrated.packActions([ExternalHelper.changeAdmin(multisigAdmin.address)])
            sc += administrated.administrate(actions).run(sender = admin)
            sc.verify(administrated.data.active == False)
            sc.verify(administrated.data.admin == multisigAdmin.address)

            sc.h2("Activate the administrated contract")
            sc.h3("Signer 1 new proposal: changeActive")
            actions = administrated.packActions([ExternalHelper.changeActive(True)])
            sc += multisigAdmin.proposal(ExternalHelper.variant(
                [sp.record(
                    target  = sp.to_address(administrated_entrypoint),
                    actions = actions
                )]
            )).run(sender = signer1)
            sc.verify(administrated.data.active == False)
            sc.h3("Signer 2 votes")
            sc += multisigAdmin.endorsement([multisigAdmin.data.lastProposalId]).run(sender = signer2)
            sc.verify(administrated.data.active == False)
            sc.h3("Signer 3 votes")
            sc += multisigAdmin.endorsement([multisigAdmin.data.lastProposalId]).run(sender = signer3)
            sc.verify(administrated.data.active == True)

            sc.h2("Use Multisig vote to deactivate the administrated contract")
            sc.h3("Signer 1 new proposal: changeActive")
            actions = administrated.packActions([ExternalHelper.changeActive(False)])
            sc += multisigAdmin.proposal(ExternalHelper.variant(
                [sp.record(
                    target  = sp.to_address(administrated_entrypoint),
                    actions = actions
                )]
            )).run(sender = signer1)
            sc.verify(administrated.data.active == True)
            sc.h3("Signer 2 and Signer 3 votes are pushed by Signer 1")
            signer2_vote = sign(signer2, contract = multisigAdmin)
            signer3_vote = sign(signer3, contract = multisigAdmin)
            proposalVotes = sp.record(
                proposalId = multisigAdmin.data.lastProposalId,
                signatures = [signer2_vote, signer3_vote]
            )
            sc += multisigAdmin.aggregated_endorsement([proposalVotes]).run(sender = signer1)
            sc.verify(administrated.data.active == False)

Classes

class MS_TYPES
Expand source code
class MS_TYPES:
    #############################################
    #           == Multisig Types ==            #
    #                                           #
    # The types in this file are shared with    #
    # contracts interacting with the multisig   #
    # contract.                                 #
    #############################################

    # Internal administration action type specification
    InternalAdminAction = sp.TVariant(
        changeSigners   = sp.TVariant(
                            removed = sp.TSet(sp.TAddress),
                            added   = sp.TList(
                                        sp.TRecord(
                                            address     = sp.TAddress,
                                            publicKey   = sp.TKey
                                        ).right_comb()
                                    )
                        ).right_comb(),
        changeQuorum    = sp.TNat,
        changeMetadata  = sp.TPair(sp.TString, sp.TOption(sp.TBytes)),
    ).right_comb()

    # External administration action type specification
    ExternalAdminAction = sp.TRecord(
        target  = sp.TAddress,
        actions = sp.TBytes
    ).right_comb()

    # Proposal action type specification
    ProposalAction = sp.TVariant(
        internal = sp.TList(InternalAdminAction),
        external = sp.TList(ExternalAdminAction)
    ).right_comb()

    # Proposal type specification
    Proposal = sp.TRecord(
        startedAt       = sp.TTimestamp,
        initiator       = sp.TAddress,
        endorsements    = sp.TSet(sp.TAddress),
        actions         = ProposalAction
    ).right_comb()

    AggregatedProposalParams = sp.TRecord(
        signatures      = sp.TList(
                            sp.TRecord(
                                signerAddress   = sp.TAddress,
                                signature       = sp.TSignature
                            ).right_comb()
                        ),
        proposalId      = sp.TNat,
        actions         = ProposalAction
    ).right_comb()

    AggregatedEndorsementParams = sp.TList(
        sp.TRecord(
            signatures      = sp.TList(
                                sp.TRecord(
                                    signerAddress   = sp.TAddress,
                                    signature       = sp.TSignature
                                ).right_comb()
                            ),
            proposalId      = sp.TNat
        ).right_comb()
    )

Class variables

var InternalAdminAction
var ExternalAdminAction
var ProposalAction
var Proposal
var AggregatedProposalParams
var AggregatedEndorsementParams
class ERR
Expand source code
class ERR:
    def make(s): return ("MULTISIG_" + s)

    Badsig                 = make("Badsig")
    ProposalUnknown        = make("ProposalUnknown")
    NotInitiator           = make("NotInitiator")
    SignerUnknown          = make("SignerUnknown")
    InvalidTarget          = make("InvalidTarget")
    MoreQuorumThanSigners  = make("MoreQuorumThanSigners")
    InvalidProposalId      = make("InvalidProposalId")

Class variables

var Badsig
var ProposalUnknown
var NotInitiator
var SignerUnknown
var InvalidTarget
var MoreQuorumThanSigners
var InvalidProposalId

Methods

def make(s)
Expand source code
def make(s): return ("MULTISIG_" + s)
class MultisigHelpers
Expand source code
class MultisigHelpers:
    def failIfNotSigner(self, address):
        sp.verify(self.data.signers.contains(address), message = ERR.SignerUnknown)

    def failIfProposalNotActive(self, proposalId):
        sp.verify(self.data.activeProposals.contains(proposalId), message = ERR.ProposalUnknown)

Subclasses

Methods

def failIfNotSigner(self, address)
Expand source code
def failIfNotSigner(self, address):
    sp.verify(self.data.signers.contains(address), message = ERR.SignerUnknown)
def failIfProposalNotActive(self, proposalId)
Expand source code
def failIfProposalNotActive(self, proposalId):
    sp.verify(self.data.activeProposals.contains(proposalId), message = ERR.ProposalUnknown)
class MultisigAdmin (quorum, signers, metadataURL)
Expand source code
class MultisigAdmin(sp.Contract, MultisigHelpers):
    def __init__(
            self,
            quorum,
            signers,
            metadataURL
        ):

        # Metadata helper
        self.init_metadata("metadata", METADATA)

        self.init_type(
            sp.TRecord(
                quorum          = sp.TNat,
                lastProposalId  = sp.TNat,
                signers         = sp.TMap(
                                    sp.TAddress,
                                    sp.TRecord(
                                        publicKey       = sp.TKey,
                                        lastProposalId  = sp.TOption(sp.TNat)
                                    ).right_comb()
                                ),
                proposals       = sp.TBigMap(sp.TNat, MS_TYPES.Proposal),
                activeProposals = sp.TSet(sp.TNat),
                metadata        = sp.TBigMap(sp.TString, sp.TBytes),
            ).right_comb()
        )
        self.init(
            quorum              = quorum,
            lastProposalId      = 0,
            signers             = signers,
            proposals           = sp.big_map(),
            activeProposals     = sp.set(),
            metadata            = sp.utils.metadata_of_url(metadataURL)
        )

    #####################################
    # ++ ENTRYPOINTS                    #
    #####################################

    @sp.entrypoint
    def proposal(self, actions):
        """
            Each user can have at most one proposal active at a time.
            Submitting a new proposal overrides the previous one.
        """
        # Proposals can only be submitted by registered signers
        self.failIfNotSigner(sp.sender)

        # If the proposal initiator has an active proposal,
        # then replace that proposal with the new one
        signerLastProposalId = self.data.signers[sp.sender].lastProposalId
        with sp.if_(signerLastProposalId != sp.none):
            self.data.activeProposals.remove(signerLastProposalId.open_some())

        # Increment proposal counter
        self.data.lastProposalId += 1
        proposalId = self.data.lastProposalId
        # Store new proposal
        self.data.activeProposals.add(proposalId)
        self.data.proposals[proposalId] = sp.record(
            startedAt       = sp.now,
            initiator       = sp.sender,
            endorsements    = sp.set([sp.sender]),
            actions         = actions
        )
        # Update signer's last proposal
        self.data.signers[sp.sender].lastProposalId = sp.some(proposalId)

        # Approve the proposal if quorum only requires 1 vote
        with sp.if_(self.data.quorum < 2):
            self.onApproved(
                sp.record(
                    proposalId  = proposalId,
                    actions     = actions,
                )
            )

    @sp.entrypoint
    def endorsement(self, endorsements):
        """
            Entrypoint used to submit endorsements to single/multiple proposals.
        """
        # Endorsements can only be submitted by registered signers
        self.failIfNotSigner(sp.sender)

        # Iterate over every endorsement
        with sp.for_("pId", endorsements) as pId:
            self.registerEndorsement(
                sp.record(
                    proposalId      = pId,
                    signerAddress   = sp.sender
                )
            )

            # Approve the proposal if quorum was reached
            proposal = self.data.proposals[pId]
            with sp.if_(sp.len(proposal.endorsements) >= self.data.quorum):
                self.onApproved(
                    sp.record(
                        proposalId  = pId,
                        actions     = proposal.actions,
                    )
                )

    @sp.entrypoint
    def aggregated_proposal(self, params):
        """
            Users can send aggregated proposal, which are signed offchain and validated onchain.
        """
        sp.set_type(params, MS_TYPES.AggregatedProposalParams)
        self.failIfNotSigner(sp.sender)

        self.data.lastProposalId += 1
        sp.verify(self.data.lastProposalId == params.proposalId, message = ERR.InvalidProposalId)

        proposal        = sp.compute(
                            sp.record(
                                startedAt       = sp.now,
                                initiator       = sp.sender,
                                endorsements    = sp.set([sp.sender]),
                                actions         = params.actions
                            )
                        )

        # If the proposal initiator has an active proposal,
        # then replace that proposal with the new one
        proposerLastProposalId = self.data.signers[sp.sender].lastProposalId
        with sp.if_(proposerLastProposalId != sp.none):
            self.data.activeProposals.remove(proposerLastProposalId.open_some())
        self.data.signers[sp.sender].lastProposalId = sp.some(params.proposalId)

        self.data.activeProposals.add(params.proposalId)
        self.data.proposals[params.proposalId] = proposal

        preSignature    = sp.compute(
                            sp.pack(
                                sp.record(
                                    actions         = params.actions,
                                    # (contractAddress + proposalId) protect against replay attacks
                                    proposalId      = params.proposalId,
                                    contractAddress = sp.self_address,
                                )
                            )
                        )

        # Validate and apply endorsements
        with sp.for_("signature", params.signatures) as signature:
            self.failIfNotSigner(signature.signerAddress)

            publicKey = self.data.signers[signature.signerAddress].publicKey
            sp.verify(sp.check_signature(publicKey, signature.signature, preSignature), message = ERR.Badsig)

            proposal.endorsements.add(signature.signerAddress)

        # Check quorum
        with sp.if_(sp.len(proposal.endorsements) >= self.data.quorum):
            self.onApproved(
                sp.record(
                    proposalId  = params.proposalId,
                    actions     = proposal.actions,
                )
            )

    @sp.entrypoint
    def aggregated_endorsement(self, endorsements):
        """
            Users can send aggregated votes, which are signed offchain and validated onchain.
        """
        sp.set_type(endorsements, MS_TYPES.AggregatedEndorsementParams)

        with sp.for_("endorsment", endorsements) as endorsement:
            with sp.for_("signature", endorsement.signatures) as signature:
                self.failIfNotSigner(signature.signerAddress)
                preSignature = sp.pack(
                    sp.record(
                        # (contractAddress + proposalId) protect against replay attacks
                        contractAddress = sp.self_address,
                        proposalId      = endorsement.proposalId
                    )
                )
                publicKey = self.data.signers[signature.signerAddress].publicKey
                sp.verify(
                    sp.check_signature(publicKey, signature.signature, preSignature),
                    message = ERR.Badsig
                )
                self.registerEndorsement(
                    sp.record(
                        proposalId      = endorsement.proposalId,
                        signerAddress   = signature.signerAddress
                    )
                )
            proposal = sp.compute(self.data.proposals[endorsement.proposalId])
            with sp.if_(sp.len(proposal.endorsements) >= self.data.quorum):
                self.onApproved(
                    sp.record(
                        proposalId  = endorsement.proposalId,
                        actions     = proposal.actions,
                    )
                )

    @sp.entrypoint
    def cancel_proposal(self, proposalId):
        self.failIfNotSigner(sp.sender)

        # Signers can only cancel their own proposals
        sp.verify(self.data.proposals[proposalId].initiator == sp.sender, message = ERR.NotInitiator)
        self.data.activeProposals.remove(proposalId)

    #####################################
    # ++ GLOBAL LAMBDAS                 #
    #####################################

    @sp.private_lambda(with_storage="read-write", wrap_call=True)
    def registerEndorsement(self, params):
        self.failIfProposalNotActive(params.proposalId)
        # Add endorsement to proposal
        self.data.proposals[params.proposalId].endorsements.add(params.signerAddress)

    @sp.private_lambda(with_storage="read-write", with_operations=True, wrap_call=True)
    def onApproved(self, params):
        with (params.actions).match_cases() as arg:
            # Internal actions are applied to the multisig contract
            with arg.match("internal") as internalActions:
                with sp.for_("action", internalActions) as action:
                    self.selfAdmin(action)
                # Removes all active proposals after an administrative change.
                self.removeActiveProposals()
            # External actions are applied to other contracts
            with arg.match("external") as externalActions:
                with sp.for_("action", externalActions) as action:
                    target = sp.contract(sp.TBytes, action.target).open_some(ERR.InvalidTarget)
                    sp.transfer(action.actions, sp.tez(0), target)

        self.data.activeProposals.remove(params.proposalId)

    def selfAdmin(self, action):
        """
            Apply administrative actions to the multisig contract
        """
        with (action).match_cases() as arg:
            with arg.match('changeQuorum') as quorum:
                self.data.quorum = quorum
            with arg.match('changeMetadata') as metadata:
                k, v = sp.match_pair(metadata)
                with sp.if_(v.is_some()):
                    self.data.metadata[k] = v.open_some()
                with sp.else_():
                    del self.data.metadata[k]

            with arg.match('changeSigners') as changeSigners:
                with (changeSigners).match_cases() as cvAction:
                    with cvAction.match('removed') as removeSet:
                        with sp.for_("address", removeSet.elements()) as address:
                            with sp.if_(self.data.signers.contains(address)):
                                # Remove signer
                                del self.data.signers[address]
                                # We don't remove signer[address].lastProposalId
                                # because we remove all activeProposals after it.
                    with cvAction.match('added') as addList:
                        with sp.for_("signer", addList) as signer:
                            self.data.signers[signer.address]   = sp.record(
                                                                    publicKey       = signer.publicKey,
                                                                    lastProposalId  = sp.none
                                                                )
        # Ensure that the contract never requires more quorum than the total of signers.
        sp.verify(self.data.quorum <= sp.len(self.data.signers), message = ERR.MoreQuorumThanSigners)

    def removeActiveProposals(self):
        """
            This method removes all active proposals when either the quorum or the signers get updated.
        """
        self.data.activeProposals = sp.set([])

Ancestors

Class variables

var registerEndorsement
var onApproved

Methods

def proposal(self, actions)

Entrypoint. Each user can have at most one proposal active at a time. Submitting a new proposal overrides the previous one.

Expand source code
@sp.entrypoint
def proposal(self, actions):
    """
        Each user can have at most one proposal active at a time.
        Submitting a new proposal overrides the previous one.
    """
    # Proposals can only be submitted by registered signers
    self.failIfNotSigner(sp.sender)

    # If the proposal initiator has an active proposal,
    # then replace that proposal with the new one
    signerLastProposalId = self.data.signers[sp.sender].lastProposalId
    with sp.if_(signerLastProposalId != sp.none):
        self.data.activeProposals.remove(signerLastProposalId.open_some())

    # Increment proposal counter
    self.data.lastProposalId += 1
    proposalId = self.data.lastProposalId
    # Store new proposal
    self.data.activeProposals.add(proposalId)
    self.data.proposals[proposalId] = sp.record(
        startedAt       = sp.now,
        initiator       = sp.sender,
        endorsements    = sp.set([sp.sender]),
        actions         = actions
    )
    # Update signer's last proposal
    self.data.signers[sp.sender].lastProposalId = sp.some(proposalId)

    # Approve the proposal if quorum only requires 1 vote
    with sp.if_(self.data.quorum < 2):
        self.onApproved(
            sp.record(
                proposalId  = proposalId,
                actions     = actions,
            )
        )
def endorsement(self, endorsements)

Entrypoint. Entrypoint used to submit endorsements to single/multiple proposals.

Expand source code
@sp.entrypoint
def endorsement(self, endorsements):
    """
        Entrypoint used to submit endorsements to single/multiple proposals.
    """
    # Endorsements can only be submitted by registered signers
    self.failIfNotSigner(sp.sender)

    # Iterate over every endorsement
    with sp.for_("pId", endorsements) as pId:
        self.registerEndorsement(
            sp.record(
                proposalId      = pId,
                signerAddress   = sp.sender
            )
        )

        # Approve the proposal if quorum was reached
        proposal = self.data.proposals[pId]
        with sp.if_(sp.len(proposal.endorsements) >= self.data.quorum):
            self.onApproved(
                sp.record(
                    proposalId  = pId,
                    actions     = proposal.actions,
                )
            )
def aggregated_proposal(self, params)

Entrypoint. Users can send aggregated proposal, which are signed offchain and validated onchain.

Expand source code
@sp.entrypoint
def aggregated_proposal(self, params):
    """
        Users can send aggregated proposal, which are signed offchain and validated onchain.
    """
    sp.set_type(params, MS_TYPES.AggregatedProposalParams)
    self.failIfNotSigner(sp.sender)

    self.data.lastProposalId += 1
    sp.verify(self.data.lastProposalId == params.proposalId, message = ERR.InvalidProposalId)

    proposal        = sp.compute(
                        sp.record(
                            startedAt       = sp.now,
                            initiator       = sp.sender,
                            endorsements    = sp.set([sp.sender]),
                            actions         = params.actions
                        )
                    )

    # If the proposal initiator has an active proposal,
    # then replace that proposal with the new one
    proposerLastProposalId = self.data.signers[sp.sender].lastProposalId
    with sp.if_(proposerLastProposalId != sp.none):
        self.data.activeProposals.remove(proposerLastProposalId.open_some())
    self.data.signers[sp.sender].lastProposalId = sp.some(params.proposalId)

    self.data.activeProposals.add(params.proposalId)
    self.data.proposals[params.proposalId] = proposal

    preSignature    = sp.compute(
                        sp.pack(
                            sp.record(
                                actions         = params.actions,
                                # (contractAddress + proposalId) protect against replay attacks
                                proposalId      = params.proposalId,
                                contractAddress = sp.self_address,
                            )
                        )
                    )

    # Validate and apply endorsements
    with sp.for_("signature", params.signatures) as signature:
        self.failIfNotSigner(signature.signerAddress)

        publicKey = self.data.signers[signature.signerAddress].publicKey
        sp.verify(sp.check_signature(publicKey, signature.signature, preSignature), message = ERR.Badsig)

        proposal.endorsements.add(signature.signerAddress)

    # Check quorum
    with sp.if_(sp.len(proposal.endorsements) >= self.data.quorum):
        self.onApproved(
            sp.record(
                proposalId  = params.proposalId,
                actions     = proposal.actions,
            )
        )
def aggregated_endorsement(self, endorsements)

Entrypoint. Users can send aggregated votes, which are signed offchain and validated onchain.

Expand source code
@sp.entrypoint
def aggregated_endorsement(self, endorsements):
    """
        Users can send aggregated votes, which are signed offchain and validated onchain.
    """
    sp.set_type(endorsements, MS_TYPES.AggregatedEndorsementParams)

    with sp.for_("endorsment", endorsements) as endorsement:
        with sp.for_("signature", endorsement.signatures) as signature:
            self.failIfNotSigner(signature.signerAddress)
            preSignature = sp.pack(
                sp.record(
                    # (contractAddress + proposalId) protect against replay attacks
                    contractAddress = sp.self_address,
                    proposalId      = endorsement.proposalId
                )
            )
            publicKey = self.data.signers[signature.signerAddress].publicKey
            sp.verify(
                sp.check_signature(publicKey, signature.signature, preSignature),
                message = ERR.Badsig
            )
            self.registerEndorsement(
                sp.record(
                    proposalId      = endorsement.proposalId,
                    signerAddress   = signature.signerAddress
                )
            )
        proposal = sp.compute(self.data.proposals[endorsement.proposalId])
        with sp.if_(sp.len(proposal.endorsements) >= self.data.quorum):
            self.onApproved(
                sp.record(
                    proposalId  = endorsement.proposalId,
                    actions     = proposal.actions,
                )
            )
def cancel_proposal(self, proposalId)

Entrypoint.

Expand source code
@sp.entrypoint
def cancel_proposal(self, proposalId):
    self.failIfNotSigner(sp.sender)

    # Signers can only cancel their own proposals
    sp.verify(self.data.proposals[proposalId].initiator == sp.sender, message = ERR.NotInitiator)
    self.data.activeProposals.remove(proposalId)
def selfAdmin(self, action)

Apply administrative actions to the multisig contract

Expand source code
def selfAdmin(self, action):
    """
        Apply administrative actions to the multisig contract
    """
    with (action).match_cases() as arg:
        with arg.match('changeQuorum') as quorum:
            self.data.quorum = quorum
        with arg.match('changeMetadata') as metadata:
            k, v = sp.match_pair(metadata)
            with sp.if_(v.is_some()):
                self.data.metadata[k] = v.open_some()
            with sp.else_():
                del self.data.metadata[k]

        with arg.match('changeSigners') as changeSigners:
            with (changeSigners).match_cases() as cvAction:
                with cvAction.match('removed') as removeSet:
                    with sp.for_("address", removeSet.elements()) as address:
                        with sp.if_(self.data.signers.contains(address)):
                            # Remove signer
                            del self.data.signers[address]
                            # We don't remove signer[address].lastProposalId
                            # because we remove all activeProposals after it.
                with cvAction.match('added') as addList:
                    with sp.for_("signer", addList) as signer:
                        self.data.signers[signer.address]   = sp.record(
                                                                publicKey       = signer.publicKey,
                                                                lastProposalId  = sp.none
                                                            )
    # Ensure that the contract never requires more quorum than the total of signers.
    sp.verify(self.data.quorum <= sp.len(self.data.signers), message = ERR.MoreQuorumThanSigners)
def removeActiveProposals(self)

This method removes all active proposals when either the quorum or the signers get updated.

Expand source code
def removeActiveProposals(self):
    """
        This method removes all active proposals when either the quorum or the signers get updated.
    """
    self.data.activeProposals = sp.set([])
class InternalHelper
Expand source code
class InternalHelper():
    def variant(content):
        return sp.variant("internal", content)

    def changeQuorum(quorum):
        return sp.variant("changeQuorum", quorum)

    def removeSigners(l):
        return sp.variant("changeSigners",
            sp.variant("removed", sp.set(l))
        )

    def addSigners(l):
        added_list = []
        for added_info in l:
            addr, publicKey = added_info
            added_list.append(
                sp.record(
                    address = addr,
                    publicKey = publicKey)
                )
        return sp.variant("changeSigners",
            sp.variant("added", sp.list(added_list))
        )

Methods

def variant(content)
Expand source code
def variant(content):
    return sp.variant("internal", content)
def changeQuorum(quorum)
Expand source code
def changeQuorum(quorum):
    return sp.variant("changeQuorum", quorum)
def removeSigners(l)
Expand source code
def removeSigners(l):
    return sp.variant("changeSigners",
        sp.variant("removed", sp.set(l))
    )
def addSigners(l)
Expand source code
def addSigners(l):
    added_list = []
    for added_info in l:
        addr, publicKey = added_info
        added_list.append(
            sp.record(
                address = addr,
                publicKey = publicKey)
            )
    return sp.variant("changeSigners",
        sp.variant("added", sp.list(added_list))
    )
class ExternalHelper
Expand source code
class ExternalHelper():
    def variant(content):
        return sp.variant("external", content)

    def changeActive(active):
        return sp.variant("changeActive", active)

    def changeAdmin(address):
        return sp.variant("changeAdmin", address)

Methods

def variant(content)
Expand source code
def variant(content):
    return sp.variant("external", content)
def changeActive(active)
Expand source code
def changeActive(active):
    return sp.variant("changeActive", active)
def changeAdmin(address)
Expand source code
def changeAdmin(address):
    return sp.variant("changeAdmin", address)
class Administrated (admin, active)

This contract is a sample It shows how a contract can be administrated through the multisig administration contract

Expand source code
class Administrated(sp.Contract):
    """
        This contract is a sample
        It shows how a contract can be administrated
        through the multisig administration contract
    """
    def __init__(self, admin, active):
        self.init(
            admin = admin,
            active = active
        )

    AdministrationType = sp.TVariant(
        changeAdmin    = sp.TAddress,
        changeActive   = sp.TBool
    )

    @sp.entrypoint
    def administrate(self, actionsBytes):
        sp.verify(sp.sender == self.data.admin, message = "NOT ADMIN")

        # actionsBytes is packed and must be unpacked
        actions = sp.unpack(actionsBytes, sp.TList(Administrated.AdministrationType)).open_some(message = "Actions are invalid")

        with sp.for_("action", actions) as action:
            with (action).match_cases() as arg:
                with arg.match('changeActive') as active:
                    self.data.active = active
                with arg.match('changeAdmin') as admin:
                    self.data.admin = admin

    @sp.entrypoint
    def verifyActive(self):
        sp.verify(self.data.active, message = "NOT ACTIVE")

    # Helpers

    def packActions(self, actions):
        actions = sp.set_type_expr(actions, sp.TList(Administrated.AdministrationType))
        return sp.pack(actions)

Ancestors

  • smartpy.Contract

Class variables

var AdministrationType

Methods

def administrate(self, actionsBytes)

Entrypoint.

Expand source code
@sp.entrypoint
def administrate(self, actionsBytes):
    sp.verify(sp.sender == self.data.admin, message = "NOT ADMIN")

    # actionsBytes is packed and must be unpacked
    actions = sp.unpack(actionsBytes, sp.TList(Administrated.AdministrationType)).open_some(message = "Actions are invalid")

    with sp.for_("action", actions) as action:
        with (action).match_cases() as arg:
            with arg.match('changeActive') as active:
                self.data.active = active
            with arg.match('changeAdmin') as admin:
                self.data.admin = admin
def verifyActive(self)

Entrypoint.

Expand source code
@sp.entrypoint
def verifyActive(self):
    sp.verify(self.data.active, message = "NOT ACTIVE")
def packActions(self, actions)
Expand source code
def packActions(self, actions):
    actions = sp.set_type_expr(actions, sp.TList(Administrated.AdministrationType))
    return sp.pack(actions)