Zeppelin’s UI should be familiar to anyone who has used Zeppelin, because the main usage scenarios of Zeppelin are interactive and require manual manipulation. Is there any other way to do it besides this manual way? What if you don’t want to use Zeppelin’S UI, but you do want Zeppelin’s ability to submit and manage big data jobs like Flink jobs? Or if you write code in Zeppelin and want to schedule it or integrate it with other systems?

If that’s what you need, the Zeppelin Client API (SDK) is what you need.

Zeppelin profile

For those unfamiliar, Zeppelin can be explained in one sentence: the gateway to the big data engine, the base for an interactive big data analysis platform. The main feature of Zeppelin is its ability to connect to a variety of engines, which can be pluggable. The following diagram shows some of the most commonly used engines, but there are many other engines that Zeppelin supports.

Zeppelin has a Rest API, but there are so many of them that the barrier to use is too high for many people unfamiliar with Zeppelin. So Zeppelin developed a Client API (SDK) for easy integration. The Zeppelin Client API (SDK) is divided into two layers (more on each one below) :

  • Zeppelin Client API (Low Level API)
  • Session API (High Level API)

Zeppelin Client API (Low Level API)

The Zeppelin Client API operates at the granularity of Note and Paragraph. You can write code in the Notebook first (for example, you can write code and test in the notebook during development) and then programmatically run jobs using Low Level apis (for example, you can schedule jobs during production). The most important class of the ZeppelinClient API is the ZeppelinClient API. Here are a few important interfaces (all of which are fairly straightforward, so I won’t go into details).

public String createNote(String notePath) throws Exception public void deleteNote(String noteId) throws Exception public  NoteResult executeNote(String noteId) throws Exception public NoteResult executeNote(String noteId, Map<String, String> parameters) throws Exception public NoteResult queryNoteResult(String noteId) throws Exception public NoteResult  submitNote(String noteId) throws Exception public NoteResult submitNote(String noteId, Map<String, String> parameters) throws Exception public NoteResult waitUntilNoteFinished(String noteId) throws Exception public String addParagraph(String noteId, String title, String text) throws Exception public void updateParagraph(String noteId, String paragraphId, String title, String text) throws Exception public ParagraphResult executeParagraph(String noteId, String paragraphId, String sessionId, Map<String, String> parameters) throws Exception public ParagraphResult submitParagraph(String noteId, String paragraphId, String sessionId, Map<String, String> parameters) throws Exception public void cancelParagraph(String noteId, String paragraphId) public ParagraphResult queryParagraphResult(String noteId, String paragraphId) public ParagraphResult waitUtilParagraphFinish(String noteId, String paragraphId)Copy the code

So what can you do with these apis?

A typical use is for us to write code in Zeppelin, test it, and then integrate it into third-party systems. This code, for example, takes Zeppelin’s built-in Spark Basic Features and runs them programmatically. You can not only run the Zeppelin Note, but you also have a ParagraphResult. What you do with the results is left to your imagination (you can display them in your system, visualize them, pass them on to other systems for consumption, etc.).

In addition, for Dynamic Forms (Dynamic controls such as text fields, drop-down boxes, etc.) you can also provide parameters dynamically, such as maxAge and Marital in the example below.

ClientConfig clientConfig = new ClientConfig("http://localhost:8080");
ZeppelinClient zClient = new ZeppelinClient(clientConfig);

String zeppelinVersion = zClient.getVersion();
System.out.println("Zeppelin version: " + zeppelinVersion);

ParagraphResult paragraphResult = zClient.executeParagraph("2A94M5J1Z", "20150210-015259_1403135953");
System.out.println("Execute the 1st spark tutorial paragraph, paragraph result: " + paragraphResult);

paragraphResult = zClient.executeParagraph("2A94M5J1Z", "20150210-015302_1492795503");
System.out.println("Execute the 2nd spark tutorial paragraph, paragraph result: " + paragraphResult);

Map<String, String> parameters = new HashMap<>();
parameters.put("maxAge", "40");
paragraphResult = zClient.executeParagraph("2A94M5J1Z", "20150212-145404_867439529", parameters);
System.out.println("Execute the 3rd spark tutorial paragraph, paragraph result: " + paragraphResult);

parameters = new HashMap<>();
parameters.put("marital", "married");
paragraphResult = zClient.executeParagraph("2A94M5J1Z", "20150213-230422_1600658137", parameters);
System.out.println("Execute the 4th spark tutorial paragraph, paragraph result: " + paragraphResult);
Copy the code

The following image shows the Spark Basic Features of Zeppelin that we want to run with the Zeppelin Client API.

Session API (High Level API)

The Session API is Zeppelin’s high level API, there’s no Note Paragraph concept in the Session API, granularity is the code that you submit. The most important class in the Session API is ZSession, the entry point to the Session API. A ZSession represents a separate Zeppelin Interpreter process, For Flink, it is an independent Flink Session Cluster. Here are some examples of typical interfaces (these apis are fairly straightforward, so I won’t go into details).

public void start() throws Exception

public void start(MessageHandler messageHandler) throws Exception

public void stop() throws Exception

public ExecuteResult execute(String code) throws Exception

public ExecuteResult execute(String subInterpreter,
                             Map<< span="">String, String> localProperties,
                             String code,
                             StatementMessageHandler messageHandler) throws Exception

public ExecuteResult submit(String code) throws Exception

public ExecuteResult submit(String subInterpreter,
                            Map<< span="">String, String> localProperties,
                            String code,
                            StatementMessageHandler messageHandler) throws Exception
                           
public void cancel(String statementId) throws Exception
 
public ExecuteResult queryStatement(String statementId) throws Exception

public ExecuteResult waitUntilFinished(String statementId) throws Exception
Copy the code

So what can you do with this API? A typical use is to dynamically create a Session (Zeppelin Interpreter process), dynamically submit the run code, and get the results. For example, if you didn’t want to use Zeppelin’s UI and wanted to build your own Flink development management platform, you could build your own UI and have the user configure Flink jobs on the UI, type in SQL, and send all this information to the back end. The back end calls ZSession to run the Flink Job.

The following Java code programmatically calls two Flink SQL statements, And read in MyStatementMessageHandler1 and MyStatementMessageHandler2 continuously sent update SQL run results (how to use this result depends on your imagination).

Note that streaming result data updates such as Flink Interpreter are implemented through webSockets, so the following code will have a CompositeMessageHandler, MyStatementMessageHandler1 and MyStatementMessageHandler2, these MessageHandler is used to handle sent via WebSocket streaming data results. Here are two Flink SQL pieces we ran in Zeppelin.

Then we will use Zeppelin Session API to run this article 2 Flink SQL, then we can be in MyStatementMessageHandler1, earned MyStatementMessageHandler2 result display.

ZSession session = null;
try {
    ClientConfig clientConfig = new ClientConfig("http://localhost:8080");
    Map<< span="">String, String> intpProperties = new HashMap<>();

    session = ZSession.builder()
        .setClientConfig(clientConfig)
        .setInterpreter("flink")
        .setIntpProperties(intpProperties)
        .build();

    // CompositeMessageHandler allow you to add StatementMessageHandler for each statement.
    // otherwise you have to use a global MessageHandler.
    session.start(new CompositeMessageHandler());
    System.out.println("Flink Web UI: " + session.getWeburl());

    System.out.println("-----------------------------------------------------------------------------");
    String initCode = IOUtils.toString(FlinkAdvancedExample.class.getResource("/init_stream.scala"));
    ExecuteResult result = session.execute(initCode);
    System.out.println("Job status: " + result.getStatus() + ", data: " + result.getResults().get(0).getData());

    // run flink ssql
    Map<< span="">String, String> localProperties = new HashMap<>();
    localProperties.put("type", "update");
    result = session.submit("ssql", localProperties, "select url, count(1) as pv from log group by url",
                            new MyStatementMessageHandler1());
    session.waitUntilFinished(result.getStatementId());

    result = session.submit("ssql", localProperties, "select upper(url), count(1) as pv from log group by url",
                            new MyStatementMessageHandler2());
    session.waitUntilFinished(result.getStatementId());

} catch (Exception e) {
    e.printStackTrace();
} finally {
    if (session != null) {
        try {
            session.stop();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

public static class MyStatementMessageHandler1 implements StatementMessageHandler {

    @Override
    public void onStatementAppendOutput(String statementId, int index, String output) {
        System.out.println("MyStatementMessageHandler1, append output: " + output);
    }

    @Override
    public void onStatementUpdateOutput(String statementId, int index, String type, String output) {
        System.out.println("MyStatementMessageHandler1, update output: " + output);
    }
}

public static class MyStatementMessageHandler2 implements StatementMessageHandler {

    @Override
    public void onStatementAppendOutput(String statementId, int index, String output) {
        System.out.println("MyStatementMessageHandler2, append output: " + output);
    }

    @Override
    public void onStatementUpdateOutput(String statementId, int index, String type, String output) {
        System.out.println("MyStatementMessageHandler2, update output: " + output);
    }
}
Copy the code

Besides programmatically running Flink Jobs, what else does this Session API give us?

In Zeppelin you can configure your Flink Cluster very richly by using %flink.conf, but %flink.conf is a plain text configuration, which is easy to misconfigure for those unfamiliar with Flink (see figure below). If you’re building Flink, you can make a more complete UI, with dropdowns and so on, and set up configuration options that the user can just select instead of typing in text.

And this kind of com.lowagie.text.paragraph local properties configuration, such as the type, the template, resumeFromLatestCheckpoint is also relatively easy to write wrong, Similarly, you can use controls in your UI to anchor these options ahead of time, rather than letting the user enter text.

I believe the Zeppelin Client API still has a lot to play with and imagine.

More Zeppelin technology dry goods and use AC can be added to the Flink on Zeppelin stapler group.

(Nail scan code plus group)