class ParallelMessageDispatcher extends MessageDispatcher with NamedLogging with Spanning
Dispatches the incoming messages of the com.digitalasset.canton.sequencing.client.SequencerClient to the different processors. It also informs the com.digitalasset.canton.participant.protocol.conflictdetection.RequestTracker about the passing of time for messages that are not processed by the com.digitalasset.canton.participant.protocol.ProtocolProcessor.
Linear Supertypes
Ordering
- Alphabetic
- By Inheritance
Inherited
- ParallelMessageDispatcher
- Spanning
- NamedLogging
- MessageDispatcher
- AnyRef
- Any
- Hide All
- Show All
Visibility
- Public
- Protected
Instance Constructors
- new ParallelMessageDispatcher(protocolVersion: ProtocolVersion, synchronizerId: SynchronizerId, participantId: ParticipantId, requestTracker: RequestTracker, requestProcessors: RequestProcessors, topologyProcessor: ParticipantTopologyProcessor, trafficProcessor: TrafficControlProcessor, acsCommitmentProcessor: ProcessorType, requestCounterAllocator: RequestCounterAllocator, recordOrderPublisher: RecordOrderPublisher, badRootHashMessagesRequestProcessor: BadRootHashMessagesRequestProcessor, inFlightSubmissionSynchronizerTracker: InFlightSubmissionSynchronizerTracker, processAsyncronously: (ViewType) => Boolean, loggerFactory: NamedLoggerFactory, metrics: ConnectedSynchronizerMetrics)(implicit ec: ExecutionContext, tracer: Tracer)
Type Members
- type ProcessingAsyncResult = AsyncResult[TickDecision]
- Attributes
- protected
- Definition Classes
- ParallelMessageDispatcher → MessageDispatcher
- type ProcessingResult = FutureUnlessShutdown[ProcessingAsyncResult]
- Attributes
- protected
- Definition Classes
- MessageDispatcher
Value Members
- final def !=(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
- final def ##: Int
- Definition Classes
- AnyRef → Any
- final def ==(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
- val acsCommitmentProcessor: ProcessorType
- Attributes
- protected
- Definition Classes
- ParallelMessageDispatcher → MessageDispatcher
- def alarm(sc: SequencerCounter, ts: CantonTimestamp, msg: String)(implicit traceContext: TraceContext): Unit
- Attributes
- protected
- Definition Classes
- MessageDispatcher
- final def asInstanceOf[T0]: T0
- Definition Classes
- Any
- val badRootHashMessagesRequestProcessor: BadRootHashMessagesRequestProcessor
- Attributes
- protected
- Definition Classes
- ParallelMessageDispatcher → MessageDispatcher
- def checkSingleRootHashMessage(rootHashMessages: Seq[OpenEnvelope[RootHashMessage[SerializedRootHashMessagePayload]]], hasEncryptedViews: Boolean): Checked[FailedRootHashMessageCheck, String, OpenEnvelope[RootHashMessage[SerializedRootHashMessagePayload]]]
- Definition Classes
- MessageDispatcher
- def clone(): AnyRef
- Attributes
- protected[lang]
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.CloneNotSupportedException]) @IntrinsicCandidate() @native()
- def doProcess(kind: MessageKind): ProcessingResult
- Attributes
- protected
- Definition Classes
- ParallelMessageDispatcher → MessageDispatcher
- implicit val ec: ExecutionContext
- Definition Classes
- ParallelMessageDispatcher → MessageDispatcher
- final def eq(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
- def equals(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef → Any
- implicit def errorLoggingContext(implicit traceContext: TraceContext): ErrorLoggingContext
- Attributes
- protected
- Definition Classes
- NamedLogging
- def filterBatchForSynchronizerId(batch: Batch[DefaultOpenEnvelope], sc: SequencerCounter, ts: CantonTimestamp)(implicit traceContext: TraceContext): Seq[DefaultOpenEnvelope]
- Attributes
- protected
- Definition Classes
- MessageDispatcher
- final def getClass(): Class[_ <: AnyRef]
- Definition Classes
- AnyRef → Any
- Annotations
- @IntrinsicCandidate() @native()
- def handleAll(tracedEvents: Traced[Seq[WithOpeningErrors[PossiblyIgnoredProtocolEvent]]]): HandlerResult
- Definition Classes
- ParallelMessageDispatcher → MessageDispatcher
- def hashCode(): Int
- Definition Classes
- AnyRef → Any
- Annotations
- @IntrinsicCandidate() @native()
- val inFlightSubmissionSynchronizerTracker: InFlightSubmissionSynchronizerTracker
- Attributes
- protected
- Definition Classes
- ParallelMessageDispatcher → MessageDispatcher
- final def isInstanceOf[T0]: Boolean
- Definition Classes
- Any
- def logDeliveryError(sc: SequencerCounter, ts: CantonTimestamp, msgId: MessageId, status: Status)(implicit traceContext: TraceContext): Unit
- Attributes
- protected
- Definition Classes
- MessageDispatcher
- def logEvent(sc: SequencerCounter, ts: CantonTimestamp, msgId: Option[MessageId], evt: SignedContent[SequencedEvent[DefaultOpenEnvelope]])(implicit traceContext: TraceContext): Unit
- Attributes
- protected
- Definition Classes
- MessageDispatcher
- def logFaultyEvent(sc: SequencerCounter, ts: CantonTimestamp, msgId: Option[MessageId], err: WithOpeningErrors[SequencedEvent[DefaultOpenEnvelope]])(implicit traceContext: TraceContext): Unit
- Attributes
- protected
- Definition Classes
- MessageDispatcher
- def logTimeProof(sc: SequencerCounter, ts: CantonTimestamp)(implicit traceContext: TraceContext): Unit
- Attributes
- protected
- Definition Classes
- MessageDispatcher
- def logger: TracedLogger
- Attributes
- protected
- Definition Classes
- NamedLogging
- val loggerFactory: NamedLoggerFactory
- Attributes
- protected
- Definition Classes
- ParallelMessageDispatcher → NamedLogging
- val metrics: ConnectedSynchronizerMetrics
- Definition Classes
- ParallelMessageDispatcher → MessageDispatcher
- implicit def namedLoggingContext(implicit traceContext: TraceContext): NamedLoggingContext
- Attributes
- protected
- Definition Classes
- NamedLogging
- final def ne(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
- def noTracingLogger: Logger
- Attributes
- protected
- Definition Classes
- NamedLogging
- final def notify(): Unit
- Definition Classes
- AnyRef
- Annotations
- @IntrinsicCandidate() @native()
- final def notifyAll(): Unit
- Definition Classes
- AnyRef
- Annotations
- @IntrinsicCandidate() @native()
- def observeDeliverError(error: DeliverError)(implicit traceContext: TraceContext): ProcessingResult
- Attributes
- protected
- Definition Classes
- MessageDispatcher
- def observeSequencing(events: Seq[RawProtocolEvent])(implicit traceContext: TraceContext): ProcessingResult
- Attributes
- protected
- Definition Classes
- MessageDispatcher
- val participantId: ParticipantId
- Attributes
- protected
- Definition Classes
- ParallelMessageDispatcher → MessageDispatcher
- def processBatch(sequencerCounter: SequencerCounter, eventE: WithOpeningErrors[SignedContent[Deliver[DefaultOpenEnvelope]]])(implicit traceContext: TraceContext): ProcessingResult
Rules for processing batches of envelopes:
Rules for processing batches of envelopes:
- Identity transactions can be included in any batch of envelopes. They must be processed first. The identity processor ignores replayed or invalid transactions and merely logs an error.
- Acs commitments can be included in any batch of envelopes. They must be processed before
the requests and results to meet the precondition of
com.digitalasset.canton.participant.pruning.AcsCommitmentProcessor's
processBatch
method. - A com.digitalasset.canton.protocol.messages.ConfirmationResultMessage message should be sent only by the trusted mediator of the synchronizer. The mediator should never include further messages with a com.digitalasset.canton.protocol.messages.ConfirmationResultMessage. So a participant accepts a com.digitalasset.canton.protocol.messages.ConfirmationResultMessage only if there are no other messages (except topology transactions and ACS commitments) in the batch. Otherwise, the participant ignores the com.digitalasset.canton.protocol.messages.ConfirmationResultMessage and raises an alarm.
- Request messages originate from untrusted participants. If the batch contains exactly one com.digitalasset.canton.protocol.messages.RootHashMessage that is sent to the participant and the mediator only, the participant processes only request messages with the same root hash. If there are no such root hash message or multiple thereof, the participant does not process the request at all because the mediator will reject the request as a whole.
- We do not know the submitting member of a particular submission because such a submission may be sequenced through an untrusted individual sequencer node (e.g., on a BFT synchronizer). Such a sequencer node could lie about the actual submitting member. These lies work even with signed submission requests when an earlier submission request is replayed. So we cannot rely on honest synchronizer nodes sending their messages only once and instead must deduplicate replays on the recipient side.
- Attributes
- protected
- Definition Classes
- MessageDispatcher
- def processTopologyTransactions(sc: SequencerCounter, ts: SequencedTime, topologyTimestampO: Option[CantonTimestamp], envelopes: Seq[DefaultOpenEnvelope])(implicit traceContext: TraceContext): ProcessingResult
- Attributes
- protected
- Definition Classes
- MessageDispatcher
- def processTraffic(ts: CantonTimestamp, timestampOfSigningKeyO: Option[CantonTimestamp], envelopes: Seq[DefaultOpenEnvelope])(implicit traceContext: TraceContext): ProcessingResult
- Attributes
- protected
- Definition Classes
- MessageDispatcher
- implicit val processingAsyncResultMonoid: Monoid[ProcessingAsyncResult]
- Attributes
- protected
- Definition Classes
- ParallelMessageDispatcher → MessageDispatcher
- val protocolVersion: ProtocolVersion
- Attributes
- protected
- Definition Classes
- ParallelMessageDispatcher → MessageDispatcher
- def pureProcessingResult: ProcessingResult
- Attributes
- protected
- Definition Classes
- MessageDispatcher
- val recordOrderPublisher: RecordOrderPublisher
- Attributes
- protected
- Definition Classes
- ParallelMessageDispatcher → MessageDispatcher
- val requestCounterAllocator: RequestCounterAllocator
- Attributes
- protected
- Definition Classes
- ParallelMessageDispatcher → MessageDispatcher
- val requestProcessors: RequestProcessors
- Attributes
- protected
- Definition Classes
- ParallelMessageDispatcher → MessageDispatcher
- val requestTracker: RequestTracker
- Attributes
- protected
- Definition Classes
- ParallelMessageDispatcher → MessageDispatcher
- final def synchronized[T0](arg0: => T0): T0
- Definition Classes
- AnyRef
- val synchronizerId: SynchronizerId
- Attributes
- protected
- Definition Classes
- ParallelMessageDispatcher → MessageDispatcher
- def toString(): String
- Definition Classes
- AnyRef → Any
- val topologyProcessor: ParticipantTopologyProcessor
- Attributes
- protected
- Definition Classes
- ParallelMessageDispatcher → MessageDispatcher
- val trafficProcessor: TrafficControlProcessor
- Attributes
- protected
- Definition Classes
- ParallelMessageDispatcher → MessageDispatcher
- final def wait(arg0: Long, arg1: Int): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException])
- final def wait(arg0: Long): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException]) @native()
- final def wait(): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException])
- def withNewTrace[A](description: String)(f: (TraceContext) => (SpanWrapper) => A)(implicit tracer: Tracer): A
- Attributes
- protected
- Definition Classes
- Spanning
- def withSpan[A](description: String)(f: (TraceContext) => (SpanWrapper) => A)(implicit traceContext: TraceContext, tracer: Tracer): A
- Attributes
- protected
- Definition Classes
- Spanning
- def withSpanFromGrpcContext[A](description: String)(f: (TraceContext) => (SpanWrapper) => A)(implicit tracer: Tracer): A
- Attributes
- protected
- Definition Classes
- Spanning