Skip to content

Latest commit

 

History

History
390 lines (312 loc) · 14.3 KB

File metadata and controls

390 lines (312 loc) · 14.3 KB

slimrpc (SLIM Remote Procedure Call)

slimrpc, or SLIM Remote Procedure Call, is a mechanism designed to enable Protocol Buffers (protobuf) RPC over SLIM (Secure Low-Latency Interactive Messaging). This is analogous to gRPC, which leverages HTTP/2 as its underlying transport layer for protobuf RPC.

A key advantage of slimrpc lies in its ability to seamlessly integrate SLIM as the transport protocol for inter-application message exchange. This significantly simplifies development: a protobuf file can be compiled to generate code that utilizes SLIM for communication. Application developers can then interact with the generated code much like they would with standard gRPC, while benefiting from the inherent security features and efficiency provided by the SLIM protocol.

This README provides a guide to understanding how slimrpc functions and how you can implement it in your applications. For detailed instructions on compiling a protobuf file to obtain the necessary slimrpc stub code, please refer to the dedicated README file of the slimrpc compiler.

SLIM naming in slimrpc

In slimrpc, each service and its individual RPC handlers are assigned a SLIM name, facilitating efficient message routing and processing. Consider the example protobuf definition, which defines four distinct services:

syntax = "proto3";

package example_service;

service Test {
  rpc ExampleUnaryUnary(ExampleRequest) returns (ExampleResponse);
  rpc ExampleUnaryStream(ExampleRequest) returns (stream ExampleResponse);
  rpc ExampleStreamUnary(stream ExampleRequest) returns (ExampleResponse);
  rpc ExampleStreamStream(stream ExampleRequest) returns (stream ExampleResponse);
}

This example showcases the four primary communication patterns supported by gRPC: Unary-Unary, Unary-Stream, Stream-Unary, and Stream-Stream.

For slimrpc, service methods are invoked using the format:

{package-name}.{service-name}/{method-name}

Based on the example_service.Test definition, the method names would be:

example_service.Test/ExampleUnaryUnary
example_service.Test/ExampleUnaryStream
example_service.Test/ExampleStreamUnary
example_service.Test/ExampleStreamStream

The slimrpc package manages all the underlying SLIM communication. Application developers only need to implement the specific functions that will be invoked when a message arrives for a defined RPC method.

Example

This section provides a detailed walkthrough of a basic slimrpc client-server interaction, leveraging the simple example provided in the examples/slimrpc/simple folder.

Generated Code

The foundation of this example is the example.proto file, which is a standard Protocol Buffers definition file. This file is compiled using the slimrpc compiler (refer to the slimrpc Compiler README for installation and usage instructions) to generate the necessary Python stub code. The generated code is available in two files: example_pb2.py and example_pb2_slimrpc.py. Specifically, example_pb2_slimrpc.py contains the slimrpc-specific stubs for both client and server implementations. Below are the key classes and functions generated by the compiler:

Client Stub (TestStub)

The TestStub class represents the client-side interface for interacting with the Test service. It provides async methods for each RPC defined in example.proto, allowing clients to initiate calls to the server.

class TestStub:
    """Client stub for Test."""

    def __init__(self, channel):
        """Constructor.

        Args:
            channel: A slim_bindings.Channel.
        """
        self._channel = channel

    async def ExampleUnaryUnary(self, request: pb2.ExampleRequest, timeout=None) -> pb2.ExampleResponse:
        """Call ExampleUnaryUnary method."""
        response_bytes = await self._channel.call_unary_async(
            "example_service.Test",
            "ExampleUnaryUnary",
            pb2.ExampleRequest.SerializeToString(request),
            timeout,
        )
        return pb2.ExampleResponse.FromString(response_bytes)

    async def ExampleUnaryStream(self, request: pb2.ExampleRequest, timeout=None):
        """Call ExampleUnaryStream method."""
        response_stream = await self._channel.call_unary_stream_async(
            "example_service.Test",
            "ExampleUnaryStream",
            pb2.ExampleRequest.SerializeToString(request),
            timeout,
        )
        while True:
            stream_msg = await response_stream.next_async()
            if stream_msg.is_end():
                break
            if stream_msg.is_error():
                raise stream_msg[0]
            if stream_msg.is_data():
                yield pb2.ExampleResponse.FromString(stream_msg[0])

    async def ExampleStreamUnary(self, request_iterator, timeout=None) -> pb2.ExampleResponse:
        """Call ExampleStreamUnary method."""
        request_stream = self._channel.call_stream_unary(
            "example_service.Test",
            "ExampleStreamUnary",
            timeout,
        )
        async for request in request_iterator:
            await request_stream.send_async(pb2.ExampleRequest.SerializeToString(request))
        response_bytes = await request_stream.finalize_stream_async()
        return pb2.ExampleResponse.FromString(response_bytes)

    async def ExampleStreamStream(self, request_iterator, timeout=None):
        """Call ExampleStreamStream method."""
        bidi_stream = self._channel.call_stream_stream(
            "example_service.Test",
            "ExampleStreamStream",
            timeout,
        )

        async def send_requests():
            async for request in request_iterator:
                await bidi_stream.send_async(pb2.ExampleRequest.SerializeToString(request))
            await bidi_stream.close_send_async()

        async def receive_responses():
            while True:
                stream_msg = await bidi_stream.recv_async()
                if stream_msg.is_end():
                    break
                if stream_msg.is_error():
                    raise stream_msg[0]
                if stream_msg.is_data():
                    yield pb2.ExampleResponse.FromString(stream_msg[0])

        # Start sending in background
        import asyncio
        send_task = asyncio.create_task(send_requests())

        try:
            async for response in receive_responses():
                yield response
        finally:
            await send_task

Key features of the client stub:

  • All methods are async and use await for the channel operations
  • Unary methods serialize requests and deserialize responses
  • Streaming methods handle StreamMessage objects with proper error checking
  • Bidirectional streaming supports concurrent send/receive operations

Server Servicer (TestServicer)

The TestServicer class defines the server-side interface. Developers implement this class to provide the actual business logic for each RPC method.

class TestServicer:
    """Server servicer for Test. Implement this class to provide your service logic."""

    def ExampleUnaryUnary(self, request, msg_context, session_context):
        """Method for ExampleUnaryUnary. Implement your service logic here."""
        raise slim_bindings.SRPCResponseError(
            code=code__pb2.UNIMPLEMENTED, message="Method not implemented!"
        )

    def ExampleUnaryStream(self, request, msg_context, session_context):
        """Method for ExampleUnaryStream. Implement your service logic here."""
        raise slim_bindings.SRPCResponseError(
            code=code__pb2.UNIMPLEMENTED, message="Method not implemented!"
        )

    def ExampleStreamUnary(self, request_iterator, session_context):
        """Method for ExampleStreamUnary. Implement your service logic here."""
        raise slim_bindings.SRPCResponseError(
            code=code__pb2.UNIMPLEMENTED, message="Method not implemented!"
        )

    def ExampleStreamStream(self, request_iterator, session_context):
        """Method for ExampleStreamStream. Implement your service logic here."""
        raise slim_bindings.SRPCResponseError(
            code=code__pb2.UNIMPLEMENTED, message="Method not implemented!"
        )

Server Registration Function (add_TestServicer_to_server)

This utility function registers an implemented TestServicer instance with an slimrpc server. It creates handler classes and registers them with the server.

def add_TestServicer_to_server(servicer, server: slim_bindings.Server):
    server.register_unary_unary(
        service_name="example_service.Test",
        method_name="ExampleUnaryUnary",
        handler=_TestServicer_ExampleUnaryUnary_Handler(servicer),
    )
    server.register_unary_stream(
        service_name="example_service.Test",
        method_name="ExampleUnaryStream",
        handler=_TestServicer_ExampleUnaryStream_Handler(servicer),
    )
    server.register_stream_unary(
        service_name="example_service.Test",
        method_name="ExampleStreamUnary",
        handler=_TestServicer_ExampleStreamUnary_Handler(servicer),
    )
    server.register_stream_stream(
        service_name="example_service.Test",
        method_name="ExampleStreamStream",
        handler=_TestServicer_ExampleStreamStream_Handler(servicer),
    )

The generated handler classes (like _TestServicer_ExampleUnaryUnary_Handler) implement the slim_bindings handler protocols and manage serialization/ deserialization automatically.

Server Implementation

The server-side logic is defined in server.py. Similar to standard gRPC implementations, the core service functionality is provided by the TestService class, which inherits from TestServicer.

The SLIM-specific code is handled within the amain() asynchronous function:

async def amain() -> None:
    # Initialize service
    tracing_config = slim_bindings.new_tracing_config()
    runtime_config = slim_bindings.new_runtime_config()
    service_config = slim_bindings.new_service_config()

    tracing_config.log_level = "info"

    slim_bindings.initialize_with_configs(
        tracing_config=tracing_config,
        runtime_config=runtime_config,
        service_config=[service_config],
    )

    service = slim_bindings.get_global_service()

    # Create local name
    local_name = slim_bindings.Name("agntcy", "grpc", "server")

    # Connect to SLIM
    client_config = slim_bindings.new_insecure_client_config("http://localhost:46357")
    conn_id = await service.connect_async(client_config)

    # Create app with shared secret
    local_app = service.create_app_with_secret(
        local_name, "my_shared_secret_for_testing_purposes_only"
    )

    # Subscribe to local name
    await local_app.subscribe_async(local_name, conn_id)

    # Create server
    server = slim_bindings.Server(local_app)

    # Add servicer
    add_TestServicer_to_server(TestService(), server)

    # Run server
    logger.info("Server starting...")
    await server.run()

Key steps:

  1. Initialize the SLIM service with configs
  2. Create a slim_bindings.Name for the server identity
  3. Connect to the SLIM node
  4. Create an app with authentication (shared secret in this example)
  5. Subscribe to receive messages at the local name
  6. Create a slim_bindings.Server from the app
  7. Register the service implementation
  8. Run the server

Client Implementation

The client-side implementation, found in client.py, creates a channel and uses the generated stub methods:

from datetime import timedelta

async def amain() -> None:
    # Initialize service
    tracing_config = slim_bindings.new_tracing_config()
    runtime_config = slim_bindings.new_runtime_config()
    service_config = slim_bindings.new_service_config()

    tracing_config.log_level = "info"

    slim_bindings.initialize_with_configs(
        tracing_config=tracing_config,
        runtime_config=runtime_config,
        service_config=[service_config],
    )

    service = slim_bindings.get_global_service()

    # Create local and remote names
    local_name = slim_bindings.Name("agntcy", "grpc", "client")
    remote_name = slim_bindings.Name("agntcy", "grpc", "server")

    # Connect to SLIM
    client_config = slim_bindings.new_insecure_client_config("http://localhost:46357")
    conn_id = await service.connect_async(client_config)

    # Create app with shared secret
    local_app = service.create_app_with_secret(
        local_name, "my_shared_secret_for_testing_purposes_only"
    )

    # Subscribe to local name
    await local_app.subscribe_async(local_name, conn_id)

    # Create channel
    channel = slim_bindings.Channel(local_app, remote_name)

    # Create stubs
    stubs = TestStub(channel)

    # Call methods
    request = ExampleRequest(example_integer=1, example_string="hello")
    response = await stubs.ExampleUnaryUnary(
        request, timeout=timedelta(seconds=2)
    )
    logger.info(f"Response: {response}")

    # Streaming example
    async for resp in stubs.ExampleUnaryStream(
        request, timeout=timedelta(seconds=2)
    ):
        logger.info(f"Stream Response: {resp}")

Key points:

  • Similar setup as the server (initialize service, create app, subscribe)
  • Create both local and remote slim_bindings.Name objects
  • Create a slim_bindings.Channel with the app and remote name
  • Use timedelta for timeout parameters
  • All stub methods are async and must be awaited
  • Streaming methods can be iterated with async for

slimrpc Under the Hood

slimrpc was introduced to simplify the integration of existing applications with SLIM. From a developer's perspective, using slimrpc is similar to gRPC, but with the benefits of SLIM's security and efficiency.

The underlying transport uses SLIM sessions with configurable reliability and timeout settings. Since sessions in SLIM can be sticky, all messages in a streaming communication will be forwarded to the same application instance.

The slim_bindings API provides:

  • Async/await support: All operations are asynchronous using Python's asyncio
  • Type safety: Proper type hints for all methods and parameters
  • Stream handling: Explicit handling of StreamMessage types with error checking
  • Concurrent streaming: Bidirectional streams support concurrent send/receive operations
  • Handler protocols: Server handlers implement protocol interfaces for type safety