class SynchronizerTimeTracker extends NamedLogging with FlagCloseable with HasFlushFuture
Provides a variety of methods for tracking time on the synchronizer.
- fetchTime and fetchTimeProof allows for proactively asking for a recent time or time proof.
- requestTick asks the tracker to ensure that an event is witnessed for the given time or greater (useful for timeouts).
- awaitTick will return a future to wait for the given time being reached on the target synchronizer.
We currently assume that the synchronizer and our host are roughly synchronized and typically
won't expect to see a time on a synchronizer until we have passed that point on our local clock.
We then wait for observationLatency
past the timestamp we are expecting to elapse on our local
clock as transmission of an event with that timestamp will still take some time to arrive at our
host. This avoids frequently asking for times before we've reached the timestamps we're looking
for locally.
We also take into account a patienceDuration
that will cause us to defer asking for a time if
we have recently seen events for the synchronizer. This is particularly useful if we are
significantly behind and reading many old events from the synchronizer.
If no activity is happening on the synchronizer we will try to ensure that we have observed an
event at least once during the minObservationDuration
.
- Alphabetic
- By Inheritance
- SynchronizerTimeTracker
- HasFlushFuture
- FlagCloseable
- PerformUnlessClosing
- HasSynchronizeWithReaders
- OnShutdownRunner
- HasRunOnClosing
- HasUnlessClosing
- AutoCloseable
- NamedLogging
- AnyRef
- Any
- Hide All
- Show All
- Public
- Protected
Instance Constructors
- new SynchronizerTimeTracker(config: SynchronizerTimeTrackerConfig, clock: Clock, timeRequestSubmitter: TimeProofRequestSubmitter, timeouts: ProcessingTimeout, loggerFactory: NamedLoggerFactory)(implicit executionContext: ExecutionContext)
Value Members
- final def !=(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
- final def ##: Int
- Definition Classes
- AnyRef → Any
- final def ==(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
- def addReader(reader: String)(implicit traceContext: TraceContext): UnlessShutdown[ReaderHandle]
TODO(#16601) Make this method private once PerformUnlessClosing doesn't need it any more
TODO(#16601) Make this method private once PerformUnlessClosing doesn't need it any more
- Attributes
- protected[this]
- Definition Classes
- HasSynchronizeWithReaders
- def addToFlushAndLogError(name: String)(future: Future[_])(implicit loggingContext: ErrorLoggingContext): Unit
Adds the task
future
to the flush future so that doFlush completes only afterfuture
has completed.Adds the task
future
to the flush future so that doFlush completes only afterfuture
has completed. Logs an error if thefuture
fails with an exception.- Attributes
- protected
- Definition Classes
- HasFlushFuture
- def addToFlushAndLogErrorUS(name: String)(future: FutureUnlessShutdown[_])(implicit loggingContext: ErrorLoggingContext): Unit
- Attributes
- protected
- Definition Classes
- HasFlushFuture
- def addToFlushWithoutLogging(name: String)(future: Future[_]): Unit
Adds the task
future
to the flush future so that doFlush completes only afterfuture
has completed.Adds the task
future
to the flush future so that doFlush completes only afterfuture
has completed. The caller is responsible for logging any exceptions thrown inside the future.- Attributes
- protected
- Definition Classes
- HasFlushFuture
- final def asInstanceOf[T0]: T0
- Definition Classes
- Any
- def awaitTick(ts: CantonTimestamp)(implicit traceContext: TraceContext): Option[Future[Unit]]
Waits for an event with a timestamp greater or equal to
ts
to be observed from the synchronizer.Waits for an event with a timestamp greater or equal to
ts
to be observed from the synchronizer. If we have already witnessed an event with a timestamp equal or exceeding the givents
thenNone
will be returned. - def clone(): AnyRef
- Attributes
- protected[lang]
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.CloneNotSupportedException]) @IntrinsicCandidate() @native()
- final def close(): Unit
Blocks until all earlier tasks have completed and then prevents further tasks from being run.
Blocks until all earlier tasks have completed and then prevents further tasks from being run.
- Definition Classes
- FlagCloseable → OnShutdownRunner → AutoCloseable
- def closingTimeout: FiniteDuration
- Attributes
- protected
- Definition Classes
- FlagCloseable → PerformUnlessClosing
- val config: SynchronizerTimeTrackerConfig
- def doFlush(): Future[Unit]
Returns a future that completes after all added futures have completed.
Returns a future that completes after all added futures have completed. The returned future never fails.
- Attributes
- protected
- Definition Classes
- HasFlushFuture
- final def eq(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
- def equals(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef → Any
- implicit def errorLoggingContext(implicit traceContext: TraceContext): ErrorLoggingContext
- Attributes
- protected
- Definition Classes
- NamedLogging
- def fetchTime(freshnessBound: NonNegativeFiniteDuration = NonNegativeFiniteDuration.Zero)(implicit traceContext: TraceContext): FutureUnlessShutdown[CantonTimestamp]
Fetches a recent synchronizer timestamp.
Fetches a recent synchronizer timestamp. If the latest received event has been received within the given
freshnessBound
(measured on the participant clock) this synchronizer timestamp will be immediately returned. If a sufficiently fresh timestamp is unavailable then a request for a time proof will be made, however the returned future will be resolved by the first event after this call (which may not necessarily be the response to our time proof request).- returns
The future completes with the synchronizer's timestamp of the event. So if the participant's local clock is ahead of the synchronizer clock, the timestamp may be earlier than now minus the freshness bound.
- def fetchTimeProof(freshnessBound: NonNegativeFiniteDuration = NonNegativeFiniteDuration.Zero)(implicit traceContext: TraceContext): FutureUnlessShutdown[TimeProof]
Similar to
fetchTime
but will only return time proof. - def flow[F[_], Env <: Envelope[_]](implicit F: Foldable[F]): Flow[F[BoxedEnvelope[OrdinaryEnvelopeBox, Env]], F[BoxedEnvelope[OrdinaryEnvelopeBox, Env]], NotUsed]
- def flush(): Future[Unit]
- Attributes
- protected[time]
- Annotations
- @VisibleForTesting()
- def flushCloseable(name: String, timeout: NonNegativeDuration): SyncCloseable
- Attributes
- protected
- Definition Classes
- HasFlushFuture
- final def getClass(): Class[_ <: AnyRef]
- Definition Classes
- AnyRef → Any
- Annotations
- @IntrinsicCandidate() @native()
- def hashCode(): Int
- Definition Classes
- AnyRef → Any
- Annotations
- @IntrinsicCandidate() @native()
- def isClosing: Boolean
Check whether we're closing.
Check whether we're closing. Susceptible to race conditions; unless you're using this as a flag to the retry lib or you really know what you're doing, prefer
performUnlessClosing
and friends.- Definition Classes
- OnShutdownRunner → HasUnlessClosing
- final def isInstanceOf[T0]: Boolean
- Definition Classes
- Any
- def keepTrackOfReaderCallStack: Boolean
Set this to true to get detailed information about all futures that did not complete during shutdown.
Set this to true to get detailed information about all futures that did not complete during shutdown.
- Attributes
- protected[this]
- Definition Classes
- PerformUnlessClosing → HasSynchronizeWithReaders
- def latestTime: Option[CantonTimestamp]
Fetch the latest timestamp we have observed from the synchronizer.
Fetch the latest timestamp we have observed from the synchronizer. Note this isn't restored on startup so will be empty until the first event after starting is seen.
- def logger: TracedLogger
- Attributes
- protected
- Definition Classes
- NamedLogging
- val loggerFactory: NamedLoggerFactory
- Attributes
- protected
- Definition Classes
- SynchronizerTimeTracker → NamedLogging
- def maxSleepMillis: Long
How often to poll to check that all tasks have completed.
How often to poll to check that all tasks have completed.
- Attributes
- protected
- Definition Classes
- PerformUnlessClosing
- def nameInternal: String
- Attributes
- protected[this]
- Definition Classes
- PerformUnlessClosing → HasSynchronizeWithReaders
- implicit def namedLoggingContext(implicit traceContext: TraceContext): NamedLoggingContext
- Attributes
- protected
- Definition Classes
- NamedLogging
- final def ne(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
- def noTracingLogger: Logger
- Attributes
- protected
- Definition Classes
- NamedLogging
- final def notify(): Unit
- Definition Classes
- AnyRef
- Annotations
- @IntrinsicCandidate() @native()
- final def notifyAll(): Unit
- Definition Classes
- AnyRef
- Annotations
- @IntrinsicCandidate() @native()
- def onCloseFailure(e: Throwable): Unit
- Attributes
- protected
- Definition Classes
- PerformUnlessClosing
- def onClosed(): Unit
- Definition Classes
- SynchronizerTimeTracker → PerformUnlessClosing
- final def onFirstClose(): Unit
Blocks until all earlier tasks have completed and then prevents further tasks from being run.
Blocks until all earlier tasks have completed and then prevents further tasks from being run.
- Definition Classes
- PerformUnlessClosing → OnShutdownRunner
- Annotations
- @SuppressWarnings()
- def performUnlessClosing[A](name: String)(f: => A)(implicit traceContext: TraceContext): UnlessShutdown[A]
Performs the task given by
f
unless a shutdown has been initiated.Performs the task given by
f
unless a shutdown has been initiated. The shutdown will only begin afterf
completes, but other tasks may execute concurrently withf
, if started using this function, or one of the other variants (performUnlessClosingF and performUnlessClosingEitherT). The tasks are assumed to take less than closingTimeout to complete.DO NOT CALL
this.close
as part off
, because it will result in a deadlock. DO NOT PUT retries, especially indefinite ones, insidef
.- f
The task to perform
- returns
scala.None$ if a shutdown has been initiated. Otherwise the result of the task.
- Definition Classes
- PerformUnlessClosing
- def performUnlessClosingCheckedT[A, N, R](name: String, onClosing: => Checked[A, N, R])(etf: => CheckedT[Future, A, N, R])(implicit ec: ExecutionContext, traceContext: TraceContext): CheckedT[Future, A, N, R]
- Definition Classes
- PerformUnlessClosing
- def performUnlessClosingCheckedUST[A, N, R](name: String, onClosing: => Checked[A, N, R])(etf: => CheckedT[FutureUnlessShutdown, A, N, R])(implicit ec: ExecutionContext, traceContext: TraceContext): CheckedT[FutureUnlessShutdown, A, N, R]
- Definition Classes
- PerformUnlessClosing
- def performUnlessClosingEitherT[E, R](name: String, onClosing: => E)(etf: => EitherT[Future, E, R])(implicit ec: ExecutionContext, traceContext: TraceContext): EitherT[Future, E, R]
Performs the EitherT[Future] given by
etf
unless a shutdown has been initiated, in which case the provided error is returned instead.Performs the EitherT[Future] given by
etf
unless a shutdown has been initiated, in which case the provided error is returned instead. Bothetf
and the error are lazy;etf
is only evaluated if there is no shutdown, the error only if we're shutting down. The shutdown will only begin afteretf
completes, but other tasks may execute concurrently withetf
, if started using this function, or one of the other variants (performUnlessClosing and performUnlessClosingF). The tasks are assumed to take less than closingTimeout to complete.DO NOT CALL
this.close
as part ofetf
, because it will result in a deadlock. DO NOT PUT retries, especially indefinite ones, insidef
.- etf
The task to perform
- Definition Classes
- PerformUnlessClosing
- def performUnlessClosingEitherU[E, R](name: String)(etf: => EitherT[Future, E, R])(implicit ec: ExecutionContext, traceContext: TraceContext): EitherT[FutureUnlessShutdown, E, R]
- Definition Classes
- PerformUnlessClosing
- def performUnlessClosingEitherUSF[E, R](name: String)(etf: => EitherT[FutureUnlessShutdown, E, R])(implicit ec: ExecutionContext, traceContext: TraceContext): EitherT[FutureUnlessShutdown, E, R]
- Definition Classes
- PerformUnlessClosing
- def performUnlessClosingEitherUSFAsync[E, R](name: String)(etf: => EitherT[FutureUnlessShutdown, E, R])(asyncResultToWaitForF: (R) => FutureUnlessShutdown[_])(implicit ec: ExecutionContext, traceContext: TraceContext): EitherT[FutureUnlessShutdown, E, R]
Use this method if closing/shutdown of the object should wait for asynchronous computation to finish too.
Use this method if closing/shutdown of the object should wait for asynchronous computation to finish too.
- etf
closing of this object will wait for all such spawned Futures to finish
- asyncResultToWaitForF
closing of this object will wait also wait for all such asynchronous Futures to finish too
- returns
the future spawned by etf
- Definition Classes
- PerformUnlessClosing
- def performUnlessClosingF[A](name: String)(f: => Future[A])(implicit ec: ExecutionContext, traceContext: TraceContext): FutureUnlessShutdown[A]
Performs the Future given by
f
unless a shutdown has been initiated.Performs the Future given by
f
unless a shutdown has been initiated. The future is lazy and not evaluated during shutdown. The shutdown will only begin afterf
completes, but other tasks may execute concurrently withf
, if started using this function, or one of the other variants (performUnlessClosing and performUnlessClosingEitherT). The tasks are assumed to take less than closingTimeout to complete.DO NOT CALL
this.close
as part off
, because it will result in a deadlock. DO NOT PUT retries, especially indefinite ones, insidef
.- f
The task to perform
- returns
The future completes with com.digitalasset.canton.lifecycle.UnlessShutdown.AbortedDueToShutdown if a shutdown has been initiated. Otherwise the result of the task wrapped in com.digitalasset.canton.lifecycle.UnlessShutdown.Outcome.
- Definition Classes
- PerformUnlessClosing
- def performUnlessClosingOptionUSF[R](name: String)(otf: => OptionT[FutureUnlessShutdown, R])(implicit ec: ExecutionContext, traceContext: TraceContext): OptionT[FutureUnlessShutdown, R]
- Definition Classes
- PerformUnlessClosing
- def performUnlessClosingUSF[A](name: String)(f: => FutureUnlessShutdown[A])(implicit ec: ExecutionContext, traceContext: TraceContext): FutureUnlessShutdown[A]
- Definition Classes
- PerformUnlessClosing
- def performUnlessClosingUSFAsync[A](name: String)(f: => FutureUnlessShutdown[A])(asyncResultToWaitForF: (A) => FutureUnlessShutdown[_])(implicit ec: ExecutionContext, traceContext: TraceContext): FutureUnlessShutdown[A]
Use this method if closing/shutdown of the object should wait for asynchronous computation to finish too.
Use this method if closing/shutdown of the object should wait for asynchronous computation to finish too.
- f
closing of this object will wait for all such spawned Futures to finish
- asyncResultToWaitForF
closing of this object will wait also wait for all such asynchronous Futures to finish too
- returns
the future spawned by f
- Definition Classes
- PerformUnlessClosing
- def remainingReaders(): Seq[String]
- Attributes
- protected[this]
- Definition Classes
- HasSynchronizeWithReaders
- def removeReader(handle: ReaderHandle): Unit
TODO(#16601) Make this method private once PerformUnlessClosing doesn't need it any more
TODO(#16601) Make this method private once PerformUnlessClosing doesn't need it any more
- Attributes
- protected[this]
- Definition Classes
- HasSynchronizeWithReaders
- def requestTick(ts: CantonTimestamp, immediately: Boolean = false)(implicit traceContext: TraceContext): TickRequest
Register that we want to observe a synchronizer time.
Register that we want to observe a synchronizer time. The tracker will attempt to make sure that we observe a sequenced event with this timestamp or greater. For ticks below * latestTime, observation has already happened and no further action is needed. If "immediately" is configured and the clock is a com.digitalasset.canton.time.SimClock, a new time proof will be fetched.
The maximum timestamp that we support waiting for is data.CantonTimestamp.MaxValue minus the configured observation latency. If a greater value is provided a warning will be logged but no error will be thrown or returned.
- def requestTicks(timestamps: Seq[Traced[CantonTimestamp]], immediately: Boolean = false)(implicit traceContext: TraceContext): Seq[TickRequest]
Register that we want to observe synchronizer times.
Register that we want to observe synchronizer times. The tracker will attempt to make sure that we observe a sequenced event with the given timestamps or greater. For ticks below latestTime, observation has already happened and no further action is needed. If "immediately" is configured and the clock is a com.digitalasset.canton.time.SimClock, a new time proof will be fetched.
The maximum timestamp that we support waiting for is data.CantonTimestamp.MaxValue minus the configured observation latency. If a greater value is provided a warning will be logged but no error will be thrown or returned.
- def runOnClose(task: RunOnClosing): UnlessShutdown[LifeCycleRegistrationHandle]
Schedules the given task to be run upon closing.
Schedules the given task to be run upon closing.
- returns
An com.digitalasset.canton.lifecycle.UnlessShutdown.Outcome indicates that the task will have been run when the
LifeCycleManager
'scloseAsync
method completes or whenAutoCloseable
'sclose
method returns, unless the returnedLifeCycleRegistrationHandle
was used to cancel the task or the task has been done beforehand. com.digitalasset.canton.lifecycle.UnlessShutdown.AbortedDueToShutdown if the task is not run due to closing. This always happens if isClosing returns true.
- Definition Classes
- OnShutdownRunner → HasRunOnClosing
- def runOnOrAfterClose(task: RunOnClosing)(implicit traceContext: TraceContext): LifeCycleRegistrationHandle
Register a task to run when closing is initiated, or run it immediately if closing is already ongoing.
Register a task to run when closing is initiated, or run it immediately if closing is already ongoing. Unlike runOnClose, this method does not guarantee that this task will have run by the time the
LifeCycleManager
'scloseAsync
method completes orAutoCloseable
'sclose
returns. This is because the task is run immediately if the component has already been closed.- Definition Classes
- HasRunOnClosing
- final def runOnOrAfterClose_(task: RunOnClosing)(implicit traceContext: TraceContext): Unit
Variant of runOnOrAfterClose that does not return a com.digitalasset.canton.lifecycle.LifeCycleRegistrationHandle.
Variant of runOnOrAfterClose that does not return a com.digitalasset.canton.lifecycle.LifeCycleRegistrationHandle.
- Definition Classes
- HasRunOnClosing
- def runTaskUnlessDone(task: RunOnClosing)(implicit traceContext: TraceContext): Unit
- Attributes
- protected[this]
- Definition Classes
- OnShutdownRunner → HasRunOnClosing
- def snapshotIncomplete: Seq[String]
Returns the list of currently incomplete tasks.
Returns the list of currently incomplete tasks. Use only for inspection and debugging.
- Definition Classes
- HasFlushFuture
- def subscriptionResumesAfter(timestamp: CantonTimestamp)(implicit traceContext: TraceContext): Unit
Inform the synchronizer time tracker about the first message the sequencer client resubscribes to from the sequencer.
Inform the synchronizer time tracker about the first message the sequencer client resubscribes to from the sequencer. This is never considered a time proof event.
- def synchronizeWithClosingPatience: FiniteDuration
- Attributes
- protected[this]
- Definition Classes
- PerformUnlessClosing → HasSynchronizeWithReaders
- def synchronizeWithReaders()(implicit traceContext: TraceContext): Boolean
- Attributes
- protected[this]
- Definition Classes
- HasSynchronizeWithReaders
- final def synchronized[T0](arg0: => T0): T0
- Definition Classes
- AnyRef
- val timeouts: ProcessingTimeout
- Attributes
- protected
- Definition Classes
- SynchronizerTimeTracker → FlagCloseable
- def toString(): String
- Definition Classes
- AnyRef → Any
- final def unlessClosing[F[_], A](fa: => F[A])(implicit F: CanAbortDueToShutdown[F]): F[A]
Runs the computation
fa
unless isClosing returns true.Runs the computation
fa
unless isClosing returns true.This method does not delay the closing while
fa
is running, unlike the methods inHasSynchronizeWithClosing
. Accordingly, this method is useful for intermittent checks whether the result of the computation is still relevant.- returns
The result of
fa
or com.digitalasset.canton.lifecycle.UnlessShutdown.AbortedDueToShutdown if isClosing is true
- Definition Classes
- HasUnlessClosing
- Annotations
- @inline()
- final def wait(arg0: Long, arg1: Int): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException])
- final def wait(arg0: Long): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException]) @native()
- final def wait(): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException])
- def withReader[F[_], A](name: String)(f: => F[A])(implicit traceContext: TraceContext, F: Thereafter[F]): UnlessShutdown[F[A]]
- Attributes
- protected[this]
- Definition Classes
- HasSynchronizeWithReaders
- def wrapHandler[Env <: Envelope[_]](handler: OrdinaryApplicationHandler[Env]): OrdinaryApplicationHandler[Env]
Create a sequencing.OrdinaryApplicationHandler for updating this time tracker