Class RpcChannel

java.lang.Object
ru.pathcreator.pyc.rpc.core.RpcChannel
All Implemented Interfaces:
AutoCloseable

public final class RpcChannel extends Object implements AutoCloseable
Один двунаправленный RPC-канал поверх Aeron. / One bidirectional RPC channel over Aeron.

Архитектура / Architecture

  caller (virtual or platform thread)
    │
    │ call(req, codec, respCodec)
    ▼
  ┌────────────────────────────────────────────┐
  │ 1. acquire PendingCall from pool           │
  │ 2. register correlationId                  │
  │ 3. ThreadLocal staging buffer              │
  │ 4. encode envelope + payload               │
  │ 5. publication.tryClaim + commit           │
  │    (ConcurrentPublication — concurrent)    │
  │ 6. SyncWaiter.await()                      │
  └────────────────────────────────────────────┘

                         │ UDP
                         ▼
  single rx thread per channel
  └── subscription.poll() loop with IdleStrategy
      │
      ├── response → pendingRegistry.remove → PendingCall.completeOk → unpark caller
      │
      └── request → OFFLOAD to executor (virtual threads by default)
                  OR direct-execute in rx thread (if DIRECT_EXECUTOR configured)

Ключевые свойства / Key properties

  • Нет sender-треда и MPSC-очереди. Caller пишет прямо в ConcurrentPublication.tryClaim(int, BufferClaim). На один hop меньше.
  • Публикация concurrent (не exclusive) — несколько caller-ов могут писать одновременно, Aeron справляется через CAS на position.
  • Handler-ы всегда OFFLOAD (или DIRECT_EXECUTOR). Нет INLINE.
  • Удалён wire-batching (он имел смысл только с sender-тредом).

Bidirectional RPC channel over Aeron. Client calls are correlated with responses by transport correlation identifiers, while server-side handlers can be executed either in offload executor threads or directly in the receive thread when explicitly configured.

A channel is still the unit of logical isolation: it owns its publication, subscription, pending-call registry, correlation flow, and handler registry. Recent changes only affect how receive polling is driven: either by a dedicated RX thread or by a node-level shared receive poller.

  • Constructor Summary

    Constructors
    Constructor
    Description
    RpcChannel(ChannelConfig config, io.aeron.Aeron aeron, ExecutorService nodeDefaultExecutor)
    Создаёт RPC-канал поверх указанного Aeron-клиента.
  • Method Summary

    Modifier and Type
    Method
    Description
    boolean
    awaitDrained(long timeout, TimeUnit unit)
    Waits until the channel has no in-flight client calls and no in-flight server handlers.
    void
    Enables drain mode for this channel.
    <Req,Resp> Resp
    call(int requestMessageTypeId, int expectedResponseTypeId, Req request, MessageCodec<Req> reqCodec, MessageCodec<Resp> respCodec)
    Выполняет синхронный RPC-вызов с таймаутом и backpressure-политикой из конфигурации канала.
    <Req,Resp> Resp
    call(int requestMessageTypeId, int expectedResponseTypeId, Req request, MessageCodec<Req> reqCodec, MessageCodec<Resp> respCodec, long timeout, TimeUnit unit)
    Выполняет синхронный RPC-вызов с явным таймаутом.
    <Req,Resp> Resp
    call(int requestMessageTypeId, int expectedResponseTypeId, Req request, MessageCodec<Req> reqCodec, MessageCodec<Resp> respCodec, long timeoutNs, BackpressurePolicy policy)
    Выполняет синхронный RPC-вызов с явным таймаутом в наносекундах и политикой backpressure.
    void
    Закрывает канал, завершает ожидающие вызовы и освобождает Aeron-ресурсы.
    boolean
    closeGracefully(long timeout, TimeUnit unit)
    Starts drain mode, waits for in-flight work to finish, and closes the channel.
    boolean
    Проверяет, считается ли канал подключенным по состоянию publication и heartbeat.
    boolean
    Returns whether the channel is currently in drain mode.
    boolean
    Returns whether the optional protocol handshake has already completed.
    void
    onRaw(int requestMessageTypeId, int responseMessageTypeId, RawRequestHandler handler)
    Регистрирует raw-обработчик для указанного типа запроса.
    <Req,Resp> void
    onRequest(int requestMessageTypeId, int responseMessageTypeId, MessageCodec<Req> reqCodec, MessageCodec<Resp> respCodec, RequestHandler<Req,Resp> handler)
    Регистрирует high-level обработчик с кодеками запроса и ответа.
    boolean
    reconnectNow(long timeout, TimeUnit unit)
    Explicitly triggers reconnect handling according to the configured ReconnectStrategy.
    long
    Returns the negotiated remote capability bitmask, or 0 when the optional handshake has not completed yet.
    int
    Returns the negotiated remote protocol version, or 0 when the optional handshake has not completed yet.
    void
    Запускает receive path канала и heartbeat.

    Methods inherited from class Object

    clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
  • Constructor Details

    • RpcChannel

      public RpcChannel(ChannelConfig config, io.aeron.Aeron aeron, ExecutorService nodeDefaultExecutor)
      Создаёт RPC-канал поверх указанного Aeron-клиента.

      Creates an RPC channel over the provided Aeron client.

      Parameters:
      config - конфигурация канала / channel configuration
      aeron - Aeron-клиент / Aeron client
      nodeDefaultExecutor - executor узла по умолчанию для offload-обработчиков / node default executor for offloaded handlers
  • Method Details

    • onRaw

      public void onRaw(int requestMessageTypeId, int responseMessageTypeId, RawRequestHandler handler)
      Регистрирует raw-обработчик для указанного типа запроса.

      Registers a raw handler for the specified request message type.

      Parameters:
      requestMessageTypeId - тип входящего запроса / incoming request message type
      responseMessageTypeId - тип исходящего ответа / outgoing response message type
      handler - raw-обработчик запроса / raw request handler
    • onRequest

      public <Req,Resp> void onRequest(int requestMessageTypeId, int responseMessageTypeId, MessageCodec<Req> reqCodec, MessageCodec<Resp> respCodec, RequestHandler<Req,Resp> handler)
      Регистрирует high-level обработчик с кодеками запроса и ответа.

      Registers a high-level handler with request and response codecs.

      Type Parameters:
      Req - тип объекта запроса / request object type
      Resp - тип объекта ответа / response object type
      Parameters:
      requestMessageTypeId - тип входящего запроса / incoming request message type
      responseMessageTypeId - тип исходящего ответа / outgoing response message type
      reqCodec - кодек запроса / request codec
      respCodec - кодек ответа / response codec
      handler - обработчик запроса / request handler
    • start

      public void start()
      Запускает receive path канала и heartbeat.

      Starts the channel receive thread and heartbeat manager.

    • close

      public void close()
      Закрывает канал, завершает ожидающие вызовы и освобождает Aeron-ресурсы.

      Closes the channel, completes pending calls, and releases Aeron resources.

      Specified by:
      close in interface AutoCloseable
    • beginDrain

      public void beginDrain()
      Enables drain mode for this channel.

      While draining, new outgoing calls are rejected locally and new incoming user requests are rejected with a structured remote RpcStatus.SERVICE_UNAVAILABLE error. In-flight work is allowed to finish.

    • isDraining

      public boolean isDraining()
      Returns whether the channel is currently in drain mode.
      Returns:
      true if drain mode is active
    • awaitDrained

      public boolean awaitDrained(long timeout, TimeUnit unit)
      Waits until the channel has no in-flight client calls and no in-flight server handlers.
      Parameters:
      timeout - timeout value
      unit - timeout unit
      Returns:
      true if the channel drained in time
    • closeGracefully

      public boolean closeGracefully(long timeout, TimeUnit unit)
      Starts drain mode, waits for in-flight work to finish, and closes the channel.
      Parameters:
      timeout - timeout value
      unit - timeout unit
      Returns:
      true if the channel drained before close
    • isConnected

      public boolean isConnected()
      Проверяет, считается ли канал подключенным по состоянию publication и heartbeat.

      Checks whether the channel is connected according to both Aeron publication state and heartbeat state.

      Returns:
      true, если канал считается подключенным / true if the channel is considered connected
    • isProtocolHandshakeComplete

      public boolean isProtocolHandshakeComplete()
      Returns whether the optional protocol handshake has already completed.
      Returns:
      true if protocol compatibility is already established
    • remoteProtocolVersion

      public int remoteProtocolVersion()
      Returns the negotiated remote protocol version, or 0 when the optional handshake has not completed yet.
      Returns:
      remote protocol version
    • remoteProtocolCapabilities

      public long remoteProtocolCapabilities()
      Returns the negotiated remote capability bitmask, or 0 when the optional handshake has not completed yet.
      Returns:
      remote protocol capabilities
    • reconnectNow

      public boolean reconnectNow(long timeout, TimeUnit unit)
      Explicitly triggers reconnect handling according to the configured ReconnectStrategy.

      For ReconnectStrategy.WAIT_FOR_CONNECTION this waits for the current path to reconnect. For ReconnectStrategy.RECREATE_ON_DISCONNECT this rebuilds the local transport resources first and then waits for the channel to come back.

      Parameters:
      timeout - timeout value
      unit - timeout unit
      Returns:
      true if the channel is connected after the reconnect attempt
    • call

      public <Req,Resp> Resp call(int requestMessageTypeId, int expectedResponseTypeId, Req request, MessageCodec<Req> reqCodec, MessageCodec<Resp> respCodec)
      Выполняет синхронный RPC-вызов с таймаутом и backpressure-политикой из конфигурации канала.

      Performs a synchronous RPC call using timeout and backpressure policy from the channel configuration.

      Type Parameters:
      Req - тип объекта запроса / request object type
      Resp - тип объекта ответа / response object type
      Parameters:
      requestMessageTypeId - тип исходящего запроса / outgoing request message type
      expectedResponseTypeId - ожидаемый тип ответа / expected response message type
      request - объект запроса / request object
      reqCodec - кодек запроса / request codec
      respCodec - кодек ответа / response codec
      Returns:
      декодированный объект ответа / decoded response object
    • call

      public <Req,Resp> Resp call(int requestMessageTypeId, int expectedResponseTypeId, Req request, MessageCodec<Req> reqCodec, MessageCodec<Resp> respCodec, long timeout, TimeUnit unit)
      Выполняет синхронный RPC-вызов с явным таймаутом.

      Performs a synchronous RPC call with an explicit timeout.

      Type Parameters:
      Req - тип объекта запроса / request object type
      Resp - тип объекта ответа / response object type
      Parameters:
      requestMessageTypeId - тип исходящего запроса / outgoing request message type
      expectedResponseTypeId - ожидаемый тип ответа / expected response message type
      request - объект запроса / request object
      reqCodec - кодек запроса / request codec
      respCodec - кодек ответа / response codec
      timeout - значение таймаута / timeout value
      unit - единица измерения таймаута / timeout unit
      Returns:
      декодированный объект ответа / decoded response object
    • call

      public <Req,Resp> Resp call(int requestMessageTypeId, int expectedResponseTypeId, Req request, MessageCodec<Req> reqCodec, MessageCodec<Resp> respCodec, long timeoutNs, BackpressurePolicy policy)
      Выполняет синхронный RPC-вызов с явным таймаутом в наносекундах и политикой backpressure.

      Performs a synchronous RPC call with an explicit timeout in nanoseconds and an explicit backpressure policy.

      Type Parameters:
      Req - тип объекта запроса / request object type
      Resp - тип объекта ответа / response object type
      Parameters:
      requestMessageTypeId - тип исходящего запроса / outgoing request message type
      expectedResponseTypeId - ожидаемый тип ответа / expected response message type
      request - объект запроса / request object
      reqCodec - кодек запроса / request codec
      respCodec - кодек ответа / response codec
      timeoutNs - таймаут ожидания ответа в наносекундах / response timeout in nanoseconds
      policy - политика обработки backpressure / backpressure policy
      Returns:
      декодированный объект ответа / decoded response object