Class RpcChannel
- All Implemented Interfaces:
AutoCloseable
Архитектура / Architecture
caller (virtual or platform thread)
│
│ call(req, codec, respCodec)
▼
┌────────────────────────────────────────────┐
│ 1. acquire PendingCall from pool │
│ 2. register correlationId │
│ 3. ThreadLocal staging buffer │
│ 4. encode envelope + payload │
│ 5. publication.tryClaim + commit │
│ (ConcurrentPublication — concurrent) │
│ 6. SyncWaiter.await() │
└────────────────────────────────────────────┘
│ UDP
▼
single rx thread per channel
└── subscription.poll() loop with IdleStrategy
│
├── response → pendingRegistry.remove → PendingCall.completeOk → unpark caller
│
└── request → OFFLOAD to executor (virtual threads by default)
OR direct-execute in rx thread (if DIRECT_EXECUTOR configured)
Ключевые свойства / Key properties
- Нет sender-треда и MPSC-очереди. Caller пишет прямо в
ConcurrentPublication.tryClaim(int, BufferClaim). На один hop меньше. - Публикация concurrent (не exclusive) — несколько caller-ов могут писать одновременно, Aeron справляется через CAS на position.
- Handler-ы всегда OFFLOAD (или DIRECT_EXECUTOR). Нет INLINE.
- Удалён wire-batching (он имел смысл только с sender-тредом).
Bidirectional RPC channel over Aeron. Client calls are correlated with responses by transport correlation identifiers, while server-side handlers can be executed either in offload executor threads or directly in the receive thread when explicitly configured.
A channel is still the unit of logical isolation: it owns its publication, subscription, pending-call registry, correlation flow, and handler registry. Recent changes only affect how receive polling is driven: either by a dedicated RX thread or by a node-level shared receive poller.
-
Constructor Summary
ConstructorsConstructorDescriptionRpcChannel(ChannelConfig config, io.aeron.Aeron aeron, ExecutorService nodeDefaultExecutor) Создаёт RPC-канал поверх указанного Aeron-клиента. -
Method Summary
Modifier and TypeMethodDescriptionbooleanawaitDrained(long timeout, TimeUnit unit) Waits until the channel has no in-flight client calls and no in-flight server handlers.voidEnables drain mode for this channel.<Req,Resp> Resp call(int requestMessageTypeId, int expectedResponseTypeId, Req request, MessageCodec<Req> reqCodec, MessageCodec<Resp> respCodec) Выполняет синхронный RPC-вызов с таймаутом и backpressure-политикой из конфигурации канала.<Req,Resp> Resp call(int requestMessageTypeId, int expectedResponseTypeId, Req request, MessageCodec<Req> reqCodec, MessageCodec<Resp> respCodec, long timeout, TimeUnit unit) Выполняет синхронный RPC-вызов с явным таймаутом.<Req,Resp> Resp call(int requestMessageTypeId, int expectedResponseTypeId, Req request, MessageCodec<Req> reqCodec, MessageCodec<Resp> respCodec, long timeoutNs, BackpressurePolicy policy) Выполняет синхронный RPC-вызов с явным таймаутом в наносекундах и политикой backpressure.voidclose()Закрывает канал, завершает ожидающие вызовы и освобождает Aeron-ресурсы.booleancloseGracefully(long timeout, TimeUnit unit) Starts drain mode, waits for in-flight work to finish, and closes the channel.booleanПроверяет, считается ли канал подключенным по состоянию publication и heartbeat.booleanReturns whether the channel is currently in drain mode.booleanReturns whether the optional protocol handshake has already completed.voidonRaw(int requestMessageTypeId, int responseMessageTypeId, RawRequestHandler handler) Регистрирует raw-обработчик для указанного типа запроса.<Req,Resp> void onRequest(int requestMessageTypeId, int responseMessageTypeId, MessageCodec<Req> reqCodec, MessageCodec<Resp> respCodec, RequestHandler<Req, Resp> handler) Регистрирует high-level обработчик с кодеками запроса и ответа.booleanreconnectNow(long timeout, TimeUnit unit) Explicitly triggers reconnect handling according to the configuredReconnectStrategy.longReturns the negotiated remote capability bitmask, or0when the optional handshake has not completed yet.intReturns the negotiated remote protocol version, or0when the optional handshake has not completed yet.voidstart()Запускает receive path канала и heartbeat.
-
Constructor Details
-
RpcChannel
Создаёт RPC-канал поверх указанного Aeron-клиента.Creates an RPC channel over the provided Aeron client.
- Parameters:
config- конфигурация канала / channel configurationaeron- Aeron-клиент / Aeron clientnodeDefaultExecutor- executor узла по умолчанию для offload-обработчиков / node default executor for offloaded handlers
-
-
Method Details
-
onRaw
Регистрирует raw-обработчик для указанного типа запроса.Registers a raw handler for the specified request message type.
- Parameters:
requestMessageTypeId- тип входящего запроса / incoming request message typeresponseMessageTypeId- тип исходящего ответа / outgoing response message typehandler- raw-обработчик запроса / raw request handler
-
onRequest
public <Req,Resp> void onRequest(int requestMessageTypeId, int responseMessageTypeId, MessageCodec<Req> reqCodec, MessageCodec<Resp> respCodec, RequestHandler<Req, Resp> handler) Регистрирует high-level обработчик с кодеками запроса и ответа.Registers a high-level handler with request and response codecs.
- Type Parameters:
Req- тип объекта запроса / request object typeResp- тип объекта ответа / response object type- Parameters:
requestMessageTypeId- тип входящего запроса / incoming request message typeresponseMessageTypeId- тип исходящего ответа / outgoing response message typereqCodec- кодек запроса / request codecrespCodec- кодек ответа / response codechandler- обработчик запроса / request handler
-
start
public void start()Запускает receive path канала и heartbeat.Starts the channel receive thread and heartbeat manager.
-
close
public void close()Закрывает канал, завершает ожидающие вызовы и освобождает Aeron-ресурсы.Closes the channel, completes pending calls, and releases Aeron resources.
- Specified by:
closein interfaceAutoCloseable
-
beginDrain
public void beginDrain()Enables drain mode for this channel.While draining, new outgoing calls are rejected locally and new incoming user requests are rejected with a structured remote
RpcStatus.SERVICE_UNAVAILABLEerror. In-flight work is allowed to finish. -
isDraining
public boolean isDraining()Returns whether the channel is currently in drain mode.- Returns:
trueif drain mode is active
-
awaitDrained
Waits until the channel has no in-flight client calls and no in-flight server handlers.- Parameters:
timeout- timeout valueunit- timeout unit- Returns:
trueif the channel drained in time
-
closeGracefully
Starts drain mode, waits for in-flight work to finish, and closes the channel.- Parameters:
timeout- timeout valueunit- timeout unit- Returns:
trueif the channel drained before close
-
isConnected
public boolean isConnected()Проверяет, считается ли канал подключенным по состоянию publication и heartbeat.Checks whether the channel is connected according to both Aeron publication state and heartbeat state.
- Returns:
true, если канал считается подключенным /trueif the channel is considered connected
-
isProtocolHandshakeComplete
public boolean isProtocolHandshakeComplete()Returns whether the optional protocol handshake has already completed.- Returns:
trueif protocol compatibility is already established
-
remoteProtocolVersion
public int remoteProtocolVersion()Returns the negotiated remote protocol version, or0when the optional handshake has not completed yet.- Returns:
- remote protocol version
-
remoteProtocolCapabilities
public long remoteProtocolCapabilities()Returns the negotiated remote capability bitmask, or0when the optional handshake has not completed yet.- Returns:
- remote protocol capabilities
-
reconnectNow
Explicitly triggers reconnect handling according to the configuredReconnectStrategy.For
ReconnectStrategy.WAIT_FOR_CONNECTIONthis waits for the current path to reconnect. ForReconnectStrategy.RECREATE_ON_DISCONNECTthis rebuilds the local transport resources first and then waits for the channel to come back.- Parameters:
timeout- timeout valueunit- timeout unit- Returns:
trueif the channel is connected after the reconnect attempt
-
call
public <Req,Resp> Resp call(int requestMessageTypeId, int expectedResponseTypeId, Req request, MessageCodec<Req> reqCodec, MessageCodec<Resp> respCodec) Выполняет синхронный RPC-вызов с таймаутом и backpressure-политикой из конфигурации канала.Performs a synchronous RPC call using timeout and backpressure policy from the channel configuration.
- Type Parameters:
Req- тип объекта запроса / request object typeResp- тип объекта ответа / response object type- Parameters:
requestMessageTypeId- тип исходящего запроса / outgoing request message typeexpectedResponseTypeId- ожидаемый тип ответа / expected response message typerequest- объект запроса / request objectreqCodec- кодек запроса / request codecrespCodec- кодек ответа / response codec- Returns:
- декодированный объект ответа / decoded response object
-
call
public <Req,Resp> Resp call(int requestMessageTypeId, int expectedResponseTypeId, Req request, MessageCodec<Req> reqCodec, MessageCodec<Resp> respCodec, long timeout, TimeUnit unit) Выполняет синхронный RPC-вызов с явным таймаутом.Performs a synchronous RPC call with an explicit timeout.
- Type Parameters:
Req- тип объекта запроса / request object typeResp- тип объекта ответа / response object type- Parameters:
requestMessageTypeId- тип исходящего запроса / outgoing request message typeexpectedResponseTypeId- ожидаемый тип ответа / expected response message typerequest- объект запроса / request objectreqCodec- кодек запроса / request codecrespCodec- кодек ответа / response codectimeout- значение таймаута / timeout valueunit- единица измерения таймаута / timeout unit- Returns:
- декодированный объект ответа / decoded response object
-
call
public <Req,Resp> Resp call(int requestMessageTypeId, int expectedResponseTypeId, Req request, MessageCodec<Req> reqCodec, MessageCodec<Resp> respCodec, long timeoutNs, BackpressurePolicy policy) Выполняет синхронный RPC-вызов с явным таймаутом в наносекундах и политикой backpressure.Performs a synchronous RPC call with an explicit timeout in nanoseconds and an explicit backpressure policy.
- Type Parameters:
Req- тип объекта запроса / request object typeResp- тип объекта ответа / response object type- Parameters:
requestMessageTypeId- тип исходящего запроса / outgoing request message typeexpectedResponseTypeId- ожидаемый тип ответа / expected response message typerequest- объект запроса / request objectreqCodec- кодек запроса / request codecrespCodec- кодек ответа / response codectimeoutNs- таймаут ожидания ответа в наносекундах / response timeout in nanosecondspolicy- политика обработки backpressure / backpressure policy- Returns:
- декодированный объект ответа / decoded response object
-