Manage client-server communication with LSP4J
- When two actors (clients and server) compliant to the LSP requirements need to communicate together they need a launcher
- a Launcher is basically the entry point for applications that will be compliant with the LSP standard.
- LSP4J provides different kind of Launcher that implement the communication through the stdio channels.
- But is also possible use the stdio channels provided by a socket.
// wait for clients to connect on port 1044
try(ServerSocket serverSocket = new ServerSocket(port)) {
Socket socket = serverSocket.accept();
Launcher<LanguageClient> launcher =
LSPLauncher.createServerLauncher(
myServer, socket.getInputStream(), socket.getOutputStream());
// add clients to the server
Runnable addClient = myServer.setRemoteProxy(launcher.getRemoteProxy());
launcher.startListening();
CompletableFuture.runAsync(addClient);
}
In order to be able to define a multithread LS is necessary wrap the previous code snippet into a callable object. Each thread (that could be defined using different implementations of ExecutorService interface) will execute the callable task.
ExecutorService threadPool = Executors.newCachedThreadPool();
Integer port = 1044;
Callable<Void> callableTask =
new Callable<Void>() {
@Override
public Void call() throws Exception {
while (true) {
try(ServerSocket serverSocket = new ServerSocket(port)) {
Socket socket = serverSocket.accept();
Launcher<LanguageClient> launcher =
LSPLauncher.createServerLauncher(
myServer, socket.getInputStream(), socket.getOutputStream());
// add clients to the server
Runnable addClient = myServer.setRemoteProxy(launcher.getRemoteProxy());
launcher.startListening();
CompletableFuture.runAsync(addClient);
}
}
}
};
threadPool.submit(callableTask);
The server needs to understand who is the client that asked to communicate with it. For this reason inside the callable activity there is an async task that execute a Runnable in order to add the LanguageClient retrieved from the launcher into the server instance that will use this class to access to the methods that it needs to use to send notification messages and other things to the connected client.
The Launcher
The launcher object provides all the underwood configuration in order to connect the remote endpoint via input and output stream. In particular the LSP launcher need some parameters: - local service = the concrete server implementation - remote interface = the LanguageClient class - input stream - output stream
Into the launcher is defined a builder that provides wires up all components necessary for the JSON-RPC communication.
/* @param localServices - the objects that receive method calls from the remote services
* @param remoteInterfaces - interfaces on which RPC methods are looked up
* @param classLoader - a class loader that is able to resolve all given interfaces
* @param in - input stream to listen for incoming messages
* @param out - output stream to send outgoing messages
* @param executorService - the executor service used to start threads
* @param wrapper - a function for plugging in additional message consumers
* @param configureGson - a function for Gson configuration
*/
static Launcher<Object> createIoLauncher(Collection<Object> localServices, Collection<Class<?>> remoteInterfaces, ClassLoader classLoader,
InputStream in, OutputStream out, ExecutorService executorService, Function<MessageConsumer, MessageConsumer> wrapper,
Consumer<GsonBuilder> configureGson) {
return new Builder<Object>()
.setLocalServices(localServices)
.setRemoteInterfaces(remoteInterfaces)
.setClassLoader(classLoader)
.setInput(in)
.setOutput(out)
.setExecutorService(executorService)
.wrapMessages(wrapper)
.configureGson(configureGson)
.create();
}
Builder - create()
The create()
method is entitled to define all the internal structures:
public Launcher<T> create() {
if (input == null)
throw new IllegalStateException("Input stream must be configured.");
if (output == null)
throw new IllegalStateException("Output stream must be configured.");
if (localServices == null)
throw new IllegalStateException("Local service must be configured.");
if (remoteInterfaces == null)
throw new IllegalStateException("Remote interface must be configured.");
MessageJsonHandler jsonHandler = createJsonHandler();
RemoteEndpoint remoteEndpoint = createRemoteEndpoint(jsonHandler);
T remoteProxy;
if (localServices.size() == 1 && remoteInterfaces.size() == 1) {
remoteProxy = ServiceEndpoints.toServiceObject(remoteEndpoint, remoteInterfaces.iterator().next());
} else {
remoteProxy = (T) ServiceEndpoints.toServiceObject(remoteEndpoint, (Collection<Class<?>>) (Object) remoteInterfaces, classLoader);
}
StreamMessageProducer reader = new StreamMessageProducer(input, jsonHandler, remoteEndpoint);
MessageConsumer messageConsumer = wrapMessageConsumer(remoteEndpoint);
ExecutorService execService = executorService != null ? executorService : Executors.newCachedThreadPool();
return new Launcher<T> () {
@Override
public Future<Void> startListening() {
return ConcurrentMessageProcessor.startProcessing(reader, messageConsumer, execService);
}
@Override
public T getRemoteProxy() {
return remoteProxy;
}
@Override
public RemoteEndpoint getRemoteEndpoint() {
return remoteEndpoint;
}
};
}
Builder - createJsonHandler()
/**
* Create the JSON handler for messages between the local and remote services.
*/
protected MessageJsonHandler createJsonHandler() {
Map<String, JsonRpcMethod> supportedMethods = getSupportedMethods();
if (configureGson != null)
return new MessageJsonHandler(supportedMethods, configureGson);
else
return new MessageJsonHandler(supportedMethods);
}
Builder - getSupportedMethods()
/**
* Gather all JSON-RPC methods from the local and remote services.
*/
protected Map<String, JsonRpcMethod> getSupportedMethods() {
Map<String, JsonRpcMethod> supportedMethods = new LinkedHashMap<>();
// Gather the supported methods of remote interfaces
for (Class<?> interface_ : remoteInterfaces) {
supportedMethods.putAll(ServiceEndpoints.getSupportedMethods(interface_));
}
// Gather the supported methods of local services
for (Object localService : localServices) {
if (localService instanceof JsonRpcMethodProvider) {
JsonRpcMethodProvider rpcMethodProvider = (JsonRpcMethodProvider) localService;
supportedMethods.putAll(rpcMethodProvider.supportedMethods());
} else {
supportedMethods.putAll(ServiceEndpoints.getSupportedMethods(localService.getClass()));
}
}
return supportedMethods;
}
Builder - createRemoteEndpoint()
/**
* Create the remote endpoint that communicates with the local services.
*/
protected RemoteEndpoint createRemoteEndpoint(MessageJsonHandler jsonHandler) {
MessageConsumer outgoingMessageStream = new StreamMessageConsumer(output, jsonHandler);
outgoingMessageStream = wrapMessageConsumer(outgoingMessageStream);
RemoteEndpoint remoteEndpoint = new RemoteEndpoint(outgoingMessageStream, ServiceEndpoints.toEndpoint(localServices));
jsonHandler.setMethodProvider(remoteEndpoint);
return remoteEndpoint;
}
Extra - RemoteEndpoint class
/**
* An endpoint that can be used to send messages to a given {@link MessageConsumer} by calling
* {@link #request(String, Object)} or {@link #notify(String, Object)}. When connected to a {@link MessageProducer},
* this class forwards received messages to the local {@link Endpoint} given in the constructor.
*/
public class RemoteEndpoint implements Endpoint, MessageConsumer, MessageIssueHandler, MethodProvider {
private static final Logger LOG = Logger.getLogger(RemoteEndpoint.class.getName());
public static final Function<Throwable, ResponseError> DEFAULT_EXCEPTION_HANDLER = (throwable) -> {
if (throwable instanceof ResponseErrorException) {
return ((ResponseErrorException) throwable).getResponseError();
} else if ((throwable instanceof CompletionException || throwable instanceof InvocationTargetException)
&& throwable.getCause() instanceof ResponseErrorException) {
return ((ResponseErrorException) throwable.getCause()).getResponseError();
} else {
return fallbackResponseError("Internal error", throwable);
}
};
private static ResponseError fallbackResponseError(String header, Throwable throwable) {
LOG.log(Level.SEVERE, header + ": " + throwable.getMessage(), throwable);
ResponseError error = new ResponseError();
error.setMessage(header + ".");
error.setCode(ResponseErrorCode.InternalError);
ByteArrayOutputStream stackTrace = new ByteArrayOutputStream();
PrintWriter stackTraceWriter = new PrintWriter(stackTrace);
throwable.printStackTrace(stackTraceWriter);
stackTraceWriter.flush();
error.setData(stackTrace.toString());
return error;
}
private final MessageConsumer out;
private final Endpoint localEndpoint;
private final Function<Throwable, ResponseError> exceptionHandler;
private final AtomicInteger nextRequestId = new AtomicInteger();
private final Map<String, PendingRequestInfo> sentRequestMap = new LinkedHashMap<>();
private final Map<String, CompletableFuture<?>> receivedRequestMap = new LinkedHashMap<>();
//...
//constructor area
/**
* @param out - a consumer that transmits messages to the remote service
* @param localEndpoint - the local service implementation
* @param exceptionHandler - an exception handler that should never return null.
*/
public RemoteEndpoint(MessageConsumer out, Endpoint localEndpoint, Function<Throwable, ResponseError> exceptionHandler) {
if (out == null)
throw new NullPointerException("out");
if (localEndpoint == null)
throw new NullPointerException("localEndpoint");
if (exceptionHandler == null)
throw new NullPointerException("exceptionHandler");
this.out = out;
this.localEndpoint = localEndpoint;
this.exceptionHandler = exceptionHandler;
}
/**
* @param out - a consumer that transmits messages to the remote service
* @param localEndpoint - the local service implementation
*/
public RemoteEndpoint(MessageConsumer out, Endpoint localEndpoint) {
this(out, localEndpoint, DEFAULT_EXCEPTION_HANDLER);
}
// ... other methods ...
}
Builder operations workflow
-
Create an instance of
MessageJsonHandler
(using internal methodcreateJsonHandler()
): MessageJsonHandler works as a wrapper around Gson that includes configuration necessary for the RPC communication. Using its routinegetSupportedMethods()
theMessageJsonHandler
will be populated with the list of all RPC supported methods both for client and server. -
With the defined
MessageJsonHandler
object, the builder invoke thecreateRemoteEndpoint()
method. Here a new Endpoint is created with a definedMessageConsumer
andEndpoint
defined.
Note
ServiceEndpoints.toEndpoint(localServices)
is a static method defined by theServiceEndpoint
class and it wraps a given object with service annotations behind an Endpoint interface.
/**
* Wraps a collection of objects with service annotations behind an {@link Endpoint} interface.
*
* @return the wrapped service endpoint
*/
public static Endpoint toEndpoint(Collection<Object> serviceObjects) {
return new GenericEndpoint(serviceObjects);
}
-
Next is necessary define the
remoteProxy
object that will be used by the LS to set the LanguageClient class (that will be used to inspect methods available that the server will use to send messages to the server) -
In order to define the RemoteProxy a static method provided by
ServiceEndpoints
will be used. This routine wraps a given Endpoint in the given service interface.
java
/**
* Wraps a given {@link Endpoint} in the given service interface.
*
* @return the wrapped service object
*/
@SuppressWarnings("unchecked")
public static <T> T toServiceObject(Endpoint endpoint, Class<T> interface_) {
Class<?>[] interfArray = new Class[]{interface_, Endpoint.class};
EndpointProxy invocationHandler = new EndpointProxy(endpoint, interface_);
return (T) Proxy.newProxyInstance(interface_.getClassLoader(), interfArray, invocationHandler);
}
4. After that is necessary define the MessageConsumer
that reads from an input stream and parses messages from JSON
5. Finally is necessary define a new ExecutorService or use one already defined and sent to the createLauncher()
method
6. After all these steps, a new anonymous class for Launcher<T>
is created and are also defined methods that the new class need to override from the Launcher
interface:
@SuppressWarnings("unchecked")
public Launcher<T> create() {
//...
return new Launcher<T> () {
@Override
public Future<Void> startListening() {
return ConcurrentMessageProcessor.startProcessing(reader, messageConsumer, execService);
}
@Override
public T getRemoteProxy() {
return remoteProxy;
}
@Override
public RemoteEndpoint getRemoteEndpoint() {
return remoteEndpoint;
}
};
}
startListening() & startProcessing() method
The startListening()
method is defined by each class that implements the Launcher interface, and also anonymous classes.
This method invoke the static startProcessing()
method defined by the class ConcurrentMessageProcessor
, defined into the package org.eclipse.lsp4j.jsonrpc.json
.
The class ConcurrentMessageProcessor
implements the Runnable
interface and its static method startProcessing return a Future<Void>
that will be resolved when the started thread is terminated:
/**
* Start a thread that listens for messages in the message producer and forwards them to the message consumer.
*
* @param messageProducer - produces messages, e.g. by reading from an input channel
* @param messageConsumer - processes messages and potentially forwards them to other consumers
* @param executorService - the thread is started using this service
* @return a future that is resolved when the started thread is terminated, e.g. by closing a stream
*/
public static Future<Void> startProcessing(MessageProducer messageProducer, MessageConsumer messageConsumer,
ExecutorService executorService) {
ConcurrentMessageProcessor reader = new ConcurrentMessageProcessor(messageProducer, messageConsumer);
final Future<?> result = executorService.submit(reader);
return new Future<Void>() {
@Override
public Void get() throws InterruptedException, ExecutionException {
return (Void) result.get();
}
@Override
public Void get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return (Void) result.get(timeout, unit);
}
@Override
public boolean isDone() {
return result.isDone();
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
if (mayInterruptIfRunning && messageProducer instanceof Closeable) {
try {
((Closeable) messageProducer).close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
return result.cancel(mayInterruptIfRunning);
}
@Override
public boolean isCancelled() {
return result.isCancelled();
}
};