Skip to content

Parallel signing #1468

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
130 changes: 78 additions & 52 deletions sigstore/_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@
import logging
import os
import sys
from concurrent import futures
from dataclasses import dataclass
from pathlib import Path
from typing import Any, NoReturn, TextIO, Union
from typing import Any, NoReturn, Union

from cryptography.hazmat.primitives.serialization import Encoding
from cryptography.x509 import load_pem_x509_certificate
Expand Down Expand Up @@ -56,7 +57,7 @@
Issuer,
detect_credential,
)
from sigstore.sign import SigningContext
from sigstore.sign import Signer, SigningContext
from sigstore.verify import (
Verifier,
policy,
Expand Down Expand Up @@ -636,6 +637,57 @@ def _get_identity_token(args: argparse.Namespace) -> None:
_invalid_arguments(args, "No identity token supplied or detected!")


def _sign_file_threaded(
signer: Signer,
predicate_type: str | None,
predicate: dict[str, Any] | None,
file: Path,
outputs: SigningOutputs,
) -> None:
"""sign method to be called from signing thread"""
_logger.debug(f"signing for {file.name}")
with file.open(mode="rb") as io:
# The input can be indefinitely large, so we perform a streaming
# digest and sign the prehash rather than buffering it fully.
digest = sha256_digest(io)
try:
if predicate is None:
result = signer.sign_artifact(input_=digest)
else:
subject = Subject(name=file.name, digest={"sha256": digest.digest.hex()})
statement_builder = StatementBuilder(
subjects=[subject],
predicate_type=predicate_type,
predicate=predicate,
)
result = signer.sign_dsse(statement_builder.build())
except ExpiredIdentity as exp_identity:
_logger.error("Signature failed: identity token has expired")
raise exp_identity

except ExpiredCertificate as exp_certificate:
_logger.error("Signature failed: Fulcio signing certificate has expired")
raise exp_certificate

_logger.info(
f"Transparency log entry created at index: {result.log_entry.log_index}"
)

if outputs.signature is not None:
signature = base64.b64encode(result.signature).decode()
with outputs.signature.open(mode="w") as io:
print(signature, file=io)

if outputs.certificate is not None:
cert_pem = signer._signing_cert().public_bytes(Encoding.PEM).decode()
with outputs.certificate.open(mode="w") as io:
print(cert_pem, file=io)

if outputs.bundle is not None:
with outputs.bundle.open(mode="w") as io:
print(result.to_json(), file=io)


def _sign_common(
args: argparse.Namespace, output_map: OutputMap, predicate: dict[str, Any] | None
) -> None:
Expand Down Expand Up @@ -666,63 +718,37 @@ def _sign_common(
if not identity:
_invalid_arguments(args, "No identity token supplied or detected!")

with signing_ctx.signer(identity) as signer:
for file, outputs in output_map.items():
_logger.debug(f"signing for {file.name}")
with file.open(mode="rb") as io:
# The input can be indefinitely large, so we perform a streaming
# digest and sign the prehash rather than buffering it fully.
digest = sha256_digest(io)
try:
if predicate is None:
result = signer.sign_artifact(input_=digest)
else:
subject = Subject(
name=file.name, digest={"sha256": digest.digest.hex()}
)
predicate_type = args.predicate_type
statement_builder = StatementBuilder(
subjects=[subject],
predicate_type=predicate_type,
predicate=predicate,
)
result = signer.sign_dsse(statement_builder.build())
except ExpiredIdentity as exp_identity:
print("Signature failed: identity token has expired")
raise exp_identity

except ExpiredCertificate as exp_certificate:
print("Signature failed: Fulcio signing certificate has expired")
raise exp_certificate

print("Using ephemeral certificate:")
cert = result.signing_certificate
cert_pem = cert.public_bytes(Encoding.PEM).decode()
print(cert_pem)

print(
f"Transparency log entry created at index: {result.log_entry.log_index}"
)
# Not all commands provide --predicate-type
predicate_type = getattr(args, "predicate_type", None)

sig_output: TextIO
if outputs.signature is not None:
sig_output = outputs.signature.open("w")
else:
sig_output = sys.stdout
with signing_ctx.signer(identity) as signer:
print("Using ephemeral certificate:")
cert_pem = signer._signing_cert().public_bytes(Encoding.PEM).decode()
print(cert_pem)

# sign in threads: this is relevant for especially Rekor v2 as otherwise we wait
# for log inclusion for each signature separately
with futures.ThreadPoolExecutor() as executor:
jobs = [
executor.submit(
_sign_file_threaded,
signer,
predicate_type,
predicate,
file,
outputs,
)
for file, outputs in output_map.items()
]
for job in futures.as_completed(jobs):
job.result()

signature = base64.b64encode(result.signature).decode()
print(signature, file=sig_output)
for file, outputs in output_map.items():
if outputs.signature is not None:
print(f"Signature written to {outputs.signature}")

if outputs.certificate is not None:
with outputs.certificate.open(mode="w") as io:
print(cert_pem, file=io)
print(f"Certificate written to {outputs.certificate}")

if outputs.bundle is not None:
with outputs.bundle.open(mode="w") as io:
print(result.to_json(), file=io)
print(f"Sigstore bundle written to {outputs.bundle}")


Expand Down
31 changes: 15 additions & 16 deletions sigstore/_internal/rekor/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,20 @@ def from_response(cls, dict_: dict[str, Any]) -> RekorLogInfo:


class _Endpoint(ABC):
def __init__(self, url: str, session: requests.Session) -> None:
def __init__(self, url: str, session: requests.Session | None = None) -> None:
# Note that _Endpoint may not be thread be safe if the same Session is provided
# to an _Endpoint in multiple threads
self.url = url
if session is None:
session = requests.Session()
session.headers.update(
{
"Content-Type": "application/json",
"Accept": "application/json",
"User-Agent": USER_AGENT,
}
)

self.session = session


Expand Down Expand Up @@ -210,20 +222,6 @@ def __init__(self, url: str) -> None:
Create a new `RekorClient` from the given URL.
"""
self.url = f"{url}/api/v1"
self.session = requests.Session()
self.session.headers.update(
{
"Content-Type": "application/json",
"Accept": "application/json",
"User-Agent": USER_AGENT,
}
)

def __del__(self) -> None:
"""
Terminates the underlying network session.
"""
self.session.close()

@classmethod
def production(cls) -> RekorClient:
Expand All @@ -246,7 +244,8 @@ def log(self) -> RekorLog:
"""
Returns a `RekorLog` adapter for making requests to a Rekor log.
"""
return RekorLog(f"{self.url}/log", session=self.session)

return RekorLog(f"{self.url}/log")

def create_entry(self, request: EntryRequestBody) -> LogEntry:
"""
Expand Down
28 changes: 13 additions & 15 deletions sigstore/_internal/rekor/client_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,20 +54,6 @@ def __init__(self, base_url: str) -> None:
Create a new `RekorV2Client` from the given URL.
"""
self.url = f"{base_url}/api/v2"
self.session = requests.Session()
self.session.headers.update(
{
"Content-Type": "application/json",
"Accept": "application/json",
"User-Agent": USER_AGENT,
}
)

def __del__(self) -> None:
"""
Terminates the underlying network session.
"""
self.session.close()

def create_entry(self, payload: EntryRequestBody) -> LogEntry:
"""
Expand All @@ -78,7 +64,19 @@ def create_entry(self, payload: EntryRequestBody) -> LogEntry:
https://github.com/sigstore/rekor-tiles/blob/main/CLIENTS.md#handling-longer-requests
"""
_logger.debug(f"proposed: {json.dumps(payload)}")
resp = self.session.post(

# Use a short lived session to avoid potential issues with multi-threading:
# Session thread-safety is ambiguous
session = requests.Session()
session.headers.update(
{
"Content-Type": "application/json",
"Accept": "application/json",
"User-Agent": USER_AGENT,
}
)

resp = session.post(
f"{self.url}/log/entries",
json=payload,
)
Expand Down
24 changes: 10 additions & 14 deletions sigstore/_internal/timestamp.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,19 +68,6 @@ def __init__(self, url: str) -> None:
Create a new `TimestampAuthorityClient` from the given URL.
"""
self.url = url
self.session = requests.Session()
self.session.headers.update(
{
"Content-Type": "application/timestamp-query",
"User-Agent": USER_AGENT,
}
)

def __del__(self) -> None:
"""
Terminates the underlying network session.
"""
self.session.close()

def request_timestamp(self, signature: bytes) -> TimeStampResponse:
"""
Expand All @@ -104,9 +91,18 @@ def request_timestamp(self, signature: bytes) -> TimeStampResponse:
msg = f"invalid request: {error}"
raise TimestampError(msg)

# Use single use session to avoid potential Session thread safety issues
session = requests.Session()
session.headers.update(
{
"Content-Type": "application/timestamp-query",
"User-Agent": USER_AGENT,
}
)

# Send it to the TSA for signing
try:
response = self.session.post(
response = session.post(
self.url,
data=timestamp_request.as_bytes(),
timeout=CLIENT_TIMEOUT,
Expand Down
5 changes: 5 additions & 0 deletions test/assets/integration/b.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
DO NOT MODIFY ME!

this is "b.txt", a sample input for sigstore-python's unit tests.

DO NOT MODIFY ME!
5 changes: 5 additions & 0 deletions test/assets/integration/c.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
DO NOT MODIFY ME!

this is "c.txt", a sample input for sigstore-python's unit tests.

DO NOT MODIFY ME!
Loading
Loading