Skip to content

Manage client-server communication with LSP4J

  • When two actors (clients and server) compliant to the LSP requirements need to communicate together they need a launcher
  • a Launcher is basically the entry point for applications that will be compliant with the LSP standard.
  • LSP4J provides different kind of Launcher that implement the communication through the stdio channels.
  • But is also possible use the stdio channels provided by a socket.
// wait for clients to connect on port 1044
try(ServerSocket serverSocket = new ServerSocket(port)) {
  Socket socket = serverSocket.accept();

  Launcher<LanguageClient> launcher =
      LSPLauncher.createServerLauncher(
          myServer, socket.getInputStream(), socket.getOutputStream());

  // add clients to the server
  Runnable addClient = myServer.setRemoteProxy(launcher.getRemoteProxy());
  launcher.startListening();
  CompletableFuture.runAsync(addClient);
}

In order to be able to define a multithread LS is necessary wrap the previous code snippet into a callable object. Each thread (that could be defined using different implementations of ExecutorService interface) will execute the callable task.

ExecutorService threadPool = Executors.newCachedThreadPool();
Integer port = 1044;
Callable<Void> callableTask =
new Callable<Void>() {
 @Override
 public Void call() throws Exception {
   while (true) {
     try(ServerSocket serverSocket = new ServerSocket(port)) {
       Socket socket = serverSocket.accept();

       Launcher<LanguageClient> launcher =
           LSPLauncher.createServerLauncher(
               myServer, socket.getInputStream(), socket.getOutputStream());

       // add clients to the server
       Runnable addClient = myServer.setRemoteProxy(launcher.getRemoteProxy());
       launcher.startListening();
       CompletableFuture.runAsync(addClient);
     }
   }
 }
};
threadPool.submit(callableTask);

The server needs to understand who is the client that asked to communicate with it. For this reason inside the callable activity there is an async task that execute a Runnable in order to add the LanguageClient retrieved from the launcher into the server instance that will use this class to access to the methods that it needs to use to send notification messages and other things to the connected client.

The Launcher

The launcher object provides all the underwood configuration in order to connect the remote endpoint via input and output stream. In particular the LSP launcher need some parameters: - local service = the concrete server implementation - remote interface = the LanguageClient class - input stream - output stream

Into the launcher is defined a builder that provides wires up all components necessary for the JSON-RPC communication.

   /* @param localServices - the objects that receive method calls from the remote services
     * @param remoteInterfaces - interfaces on which RPC methods are looked up
     * @param classLoader - a class loader that is able to resolve all given interfaces
     * @param in - input stream to listen for incoming messages
     * @param out - output stream to send outgoing messages
     * @param executorService - the executor service used to start threads
     * @param wrapper - a function for plugging in additional message consumers
     * @param configureGson - a function for Gson configuration
     */
    static Launcher<Object> createIoLauncher(Collection<Object> localServices, Collection<Class<?>> remoteInterfaces, ClassLoader classLoader,
            InputStream in, OutputStream out, ExecutorService executorService, Function<MessageConsumer, MessageConsumer> wrapper,
            Consumer<GsonBuilder> configureGson) {
        return new Builder<Object>()
                .setLocalServices(localServices)
                .setRemoteInterfaces(remoteInterfaces)
                .setClassLoader(classLoader)
                .setInput(in)
                .setOutput(out)
                .setExecutorService(executorService)
                .wrapMessages(wrapper)
                .configureGson(configureGson)
                .create();
    }

Builder - create()

The create() method is entitled to define all the internal structures:

public Launcher<T> create() {
    if (input == null)
        throw new IllegalStateException("Input stream must be configured.");
    if (output == null)
        throw new IllegalStateException("Output stream must be configured.");
    if (localServices == null)
        throw new IllegalStateException("Local service must be configured.");
    if (remoteInterfaces == null)
        throw new IllegalStateException("Remote interface must be configured.");

    MessageJsonHandler jsonHandler = createJsonHandler();
    RemoteEndpoint remoteEndpoint = createRemoteEndpoint(jsonHandler);
    T remoteProxy;
    if (localServices.size() == 1 && remoteInterfaces.size() == 1) {
        remoteProxy = ServiceEndpoints.toServiceObject(remoteEndpoint, remoteInterfaces.iterator().next());
    } else {
        remoteProxy = (T) ServiceEndpoints.toServiceObject(remoteEndpoint, (Collection<Class<?>>) (Object) remoteInterfaces, classLoader);
    }
    StreamMessageProducer reader = new StreamMessageProducer(input, jsonHandler, remoteEndpoint);
    MessageConsumer messageConsumer = wrapMessageConsumer(remoteEndpoint);
    ExecutorService execService = executorService != null ? executorService : Executors.newCachedThreadPool();

    return new Launcher<T> () {
        @Override
        public Future<Void> startListening() {
            return ConcurrentMessageProcessor.startProcessing(reader, messageConsumer, execService);
        }

        @Override
        public T getRemoteProxy() {
            return remoteProxy;
        }

        @Override
        public RemoteEndpoint getRemoteEndpoint() {
            return remoteEndpoint;
        }
    };
}

Builder - createJsonHandler()

/**
 * Create the JSON handler for messages between the local and remote services.
 */
protected MessageJsonHandler createJsonHandler() {
  Map<String, JsonRpcMethod> supportedMethods = getSupportedMethods();
  if (configureGson != null)
    return new MessageJsonHandler(supportedMethods, configureGson);
  else
    return new MessageJsonHandler(supportedMethods);
}

Builder - getSupportedMethods()

/**
* Gather all JSON-RPC methods from the local and remote services.
*/
protected Map<String, JsonRpcMethod> getSupportedMethods() {
  Map<String, JsonRpcMethod> supportedMethods = new LinkedHashMap<>();
  // Gather the supported methods of remote interfaces
  for (Class<?> interface_ : remoteInterfaces) {
    supportedMethods.putAll(ServiceEndpoints.getSupportedMethods(interface_));
  }

  // Gather the supported methods of local services
  for (Object localService : localServices) {
    if (localService instanceof JsonRpcMethodProvider) {
        JsonRpcMethodProvider rpcMethodProvider = (JsonRpcMethodProvider) localService;
        supportedMethods.putAll(rpcMethodProvider.supportedMethods());
    } else {
        supportedMethods.putAll(ServiceEndpoints.getSupportedMethods(localService.getClass()));
    }
  }

  return supportedMethods;
}

Builder - createRemoteEndpoint()

/**
* Create the remote endpoint that communicates with the local services.
*/
protected RemoteEndpoint createRemoteEndpoint(MessageJsonHandler jsonHandler) {
  MessageConsumer outgoingMessageStream = new StreamMessageConsumer(output, jsonHandler);
  outgoingMessageStream = wrapMessageConsumer(outgoingMessageStream);
  RemoteEndpoint remoteEndpoint = new RemoteEndpoint(outgoingMessageStream, ServiceEndpoints.toEndpoint(localServices));
  jsonHandler.setMethodProvider(remoteEndpoint);
  return remoteEndpoint;
}

Extra - RemoteEndpoint class

/**
 * An endpoint that can be used to send messages to a given {@link MessageConsumer} by calling
 * {@link #request(String, Object)} or {@link #notify(String, Object)}. When connected to a {@link MessageProducer},
 * this class forwards received messages to the local {@link Endpoint} given in the constructor.
 */
 public class RemoteEndpoint implements Endpoint, MessageConsumer, MessageIssueHandler, MethodProvider {
   private static final Logger LOG = Logger.getLogger(RemoteEndpoint.class.getName());

   public static final Function<Throwable, ResponseError> DEFAULT_EXCEPTION_HANDLER = (throwable) -> {
     if (throwable instanceof ResponseErrorException) {
       return ((ResponseErrorException) throwable).getResponseError();
     } else if ((throwable instanceof CompletionException || throwable instanceof InvocationTargetException)
         && throwable.getCause() instanceof ResponseErrorException) {
       return ((ResponseErrorException) throwable.getCause()).getResponseError();
     } else {
       return fallbackResponseError("Internal error", throwable);
     }
   };

   private static ResponseError fallbackResponseError(String header, Throwable throwable) {
     LOG.log(Level.SEVERE, header + ": " + throwable.getMessage(), throwable);
     ResponseError error = new ResponseError();
     error.setMessage(header + ".");
     error.setCode(ResponseErrorCode.InternalError);
     ByteArrayOutputStream stackTrace = new ByteArrayOutputStream();
     PrintWriter stackTraceWriter = new PrintWriter(stackTrace);
     throwable.printStackTrace(stackTraceWriter);
     stackTraceWriter.flush();
     error.setData(stackTrace.toString());
     return error;
   }

   private final MessageConsumer out;
   private final Endpoint localEndpoint;
   private final Function<Throwable, ResponseError> exceptionHandler;

   private final AtomicInteger nextRequestId = new AtomicInteger();
   private final Map<String, PendingRequestInfo> sentRequestMap = new LinkedHashMap<>();
   private final Map<String, CompletableFuture<?>> receivedRequestMap = new LinkedHashMap<>();

   //...

   //constructor area

   /**
    * @param out - a consumer that transmits messages to the remote service
    * @param localEndpoint - the local service implementation
    * @param exceptionHandler - an exception handler that should never return null.
    */
   public RemoteEndpoint(MessageConsumer out, Endpoint localEndpoint, Function<Throwable, ResponseError> exceptionHandler) {
     if (out == null)
       throw new NullPointerException("out");
     if (localEndpoint == null)
       throw new NullPointerException("localEndpoint");
     if (exceptionHandler == null)
       throw new NullPointerException("exceptionHandler");
     this.out = out;
     this.localEndpoint = localEndpoint;
     this.exceptionHandler = exceptionHandler;
   }

   /**
    * @param out - a consumer that transmits messages to the remote service
    * @param localEndpoint - the local service implementation
    */
   public RemoteEndpoint(MessageConsumer out, Endpoint localEndpoint) {
     this(out, localEndpoint, DEFAULT_EXCEPTION_HANDLER);
   }

   // ... other methods ...
 }

Builder operations workflow

  1. Create an instance of MessageJsonHandler (using internal method createJsonHandler()): MessageJsonHandler works as a wrapper around Gson that includes configuration necessary for the RPC communication. Using its routine getSupportedMethods() the MessageJsonHandler will be populated with the list of all RPC supported methods both for client and server.

  2. With the defined MessageJsonHandler object, the builder invoke the createRemoteEndpoint() method. Here a new Endpoint is created with a defined MessageConsumer and Endpoint defined.

Note

ServiceEndpoints.toEndpoint(localServices) is a static method defined by the ServiceEndpoint class and it wraps a given object with service annotations behind an Endpoint interface.

/**
 * Wraps a collection of objects with service annotations behind an {@link Endpoint} interface.
 *
 * @return the wrapped service endpoint
 */
public static Endpoint toEndpoint(Collection<Object> serviceObjects) {
  return new GenericEndpoint(serviceObjects);
}
  1. Next is necessary define the remoteProxy object that will be used by the LS to set the LanguageClient class (that will be used to inspect methods available that the server will use to send messages to the server)

  2. In order to define the RemoteProxy a static method provided by ServiceEndpoints will be used. This routine wraps a given Endpoint in the given service interface.

java /** * Wraps a given {@link Endpoint} in the given service interface. * * @return the wrapped service object */ @SuppressWarnings("unchecked") public static <T> T toServiceObject(Endpoint endpoint, Class<T> interface_) { Class<?>[] interfArray = new Class[]{interface_, Endpoint.class}; EndpointProxy invocationHandler = new EndpointProxy(endpoint, interface_); return (T) Proxy.newProxyInstance(interface_.getClassLoader(), interfArray, invocationHandler); } 4. After that is necessary define the MessageConsumer that reads from an input stream and parses messages from JSON 5. Finally is necessary define a new ExecutorService or use one already defined and sent to the createLauncher() method 6. After all these steps, a new anonymous class for Launcher<T> is created and are also defined methods that the new class need to override from the Launcher interface:

@SuppressWarnings("unchecked")
public Launcher<T> create() {
    //...

    return new Launcher<T> () {
        @Override
        public Future<Void> startListening() {
            return ConcurrentMessageProcessor.startProcessing(reader, messageConsumer, execService);
        }

        @Override
        public T getRemoteProxy() {
            return remoteProxy;
        }

        @Override
        public RemoteEndpoint getRemoteEndpoint() {
            return remoteEndpoint;
        }
    };
}

startListening() & startProcessing() method

The startListening() method is defined by each class that implements the Launcher interface, and also anonymous classes. This method invoke the static startProcessing() method defined by the class ConcurrentMessageProcessor, defined into the package org.eclipse.lsp4j.jsonrpc.json. The class ConcurrentMessageProcessor implements the Runnable interface and its static method startProcessing return a Future<Void> that will be resolved when the started thread is terminated:

/**
 * Start a thread that listens for messages in the message producer and forwards them to the message consumer.
 *
 * @param messageProducer - produces messages, e.g. by reading from an input channel
 * @param messageConsumer - processes messages and potentially forwards them to other consumers
 * @param executorService - the thread is started using this service
 * @return a future that is resolved when the started thread is terminated, e.g. by closing a stream
 */
public static Future<Void> startProcessing(MessageProducer messageProducer, MessageConsumer messageConsumer,
    ExecutorService executorService) {
  ConcurrentMessageProcessor reader = new ConcurrentMessageProcessor(messageProducer, messageConsumer);
  final Future<?> result = executorService.submit(reader);
  return new Future<Void>() {

    @Override
    public Void get() throws InterruptedException, ExecutionException {
      return (Void) result.get();
    }

    @Override
    public Void get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
      return (Void) result.get(timeout, unit);
    }

    @Override
    public boolean isDone() {
      return result.isDone();
    }

    @Override
    public boolean cancel(boolean mayInterruptIfRunning) {
      if (mayInterruptIfRunning && messageProducer instanceof Closeable) {
        try {
          ((Closeable) messageProducer).close();
        } catch (IOException e) {
          throw new RuntimeException(e);
        }
      }
      return result.cancel(mayInterruptIfRunning);
    }

    @Override
    public boolean isCancelled() {
      return result.isCancelled();
    }
  };