Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ZEPPELIN-6163] HBase interpreter supports hbase-2.x #4908

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 1 addition & 23 deletions docs/interpreter/hbase.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,7 @@ limitations under the License.
To get start with HBase, please see [HBase Quickstart](https://hbase.apache.org/book.html#quickstart).

## HBase release supported
By default, Zeppelin is built against HBase 1.0.x releases. To work with HBase 1.1.x releases, use the following build command:

```bash
# HBase 1.1.4
./mvnw clean package -DskipTests -Phadoop-2.6 -Dhadoop.version=2.6.0 -P build-distr -Dhbase.hbase.version=1.1.4 -Dhbase.hadoop.version=2.6.0
```

To work with HBase 1.2.0+, use the following build command:

```bash
# HBase 1.2.0
./mvnw clean package -DskipTests -Phadoop-2.6 -Dhadoop.version=2.6.0 -P build-distr -Dhbase.hbase.version=1.2.0 -Dhbase.hadoop.version=2.6.0
```
Zeppelin is built against HBase 2.x releases.

## Configuration

Expand All @@ -55,16 +43,6 @@ To work with HBase 1.2.0+, use the following build command:
<td>/usr/lib/hbase</td>
<td>Installation directory of HBase, defaults to HBASE_HOME in environment</td>
</tr>
<tr>
<td>hbase.ruby.sources</td>
<td>lib/ruby</td>
<td>Path to Ruby scripts relative to 'hbase.home'</td>
</tr>
<tr>
<td>zeppelin.hbase.test.mode</td>
<td>false</td>
<td>Disable checks for unit and manual tests</td>
</tr>
</table>

If you want to connect to HBase running on a cluster, you'll need to follow the next step.
Expand Down
10 changes: 6 additions & 4 deletions hbase/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,16 @@
<properties>
<!--library versions-->
<interpreter.name>hbase</interpreter.name>
<jruby.version>1.6.8</jruby.version>
</properties>

<dependencies>
<dependency>
<groupId>org.jruby</groupId>
<artifactId>jruby-complete</artifactId>
<version>${jruby.version}</version>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
</dependencies>

Expand Down
199 changes: 98 additions & 101 deletions hbase/src/main/java/org/apache/zeppelin/hbase/HbaseInterpreter.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,118 +14,120 @@

package org.apache.zeppelin.hbase;

import org.jruby.embed.LocalContextScope;
import org.jruby.embed.ScriptingContainer;
import org.apache.commons.exec.*;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.scheduler.SchedulerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.StringWriter;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.scheduler.SchedulerFactory;

/**
* Support for HBase Shell. All the commands documented here
* http://hbase.apache.org/book.html#shell is supported.
*
* Requirements:
* HBase Shell should be installed on the same machine. To be more specific, the following dir.
* should be available: https://github.com/apache/hbase/tree/master/hbase-shell/src/main/ruby
* HBase Shell should be able to connect to the HBase cluster from terminal. This makes sure
* that the client is configured properly.
*
* The interpreter takes 3 config parameters:
* hbase.home: Root directory where HBase is installed. Default is /usr/lib/hbase/
* hbase.ruby.sources: Dir where shell ruby code is installed.
* Path is relative to hbase.home. Default: lib/ruby
* zeppelin.hbase.test.mode: (Testing only) Disable checks for unit and manual tests. Default: false
* HBase interpreter. It uses the hbase shell to interpret the commands.
*/
public class HbaseInterpreter extends Interpreter {
private static final Logger LOGGER = LoggerFactory.getLogger(HbaseInterpreter.class);

public static final String HBASE_HOME = "hbase.home";
public static final String HBASE_RUBY_SRC = "hbase.ruby.sources";
public static final String HBASE_TEST_MODE = "zeppelin.hbase.test.mode";

private static final Logger LOGGER = LoggerFactory.getLogger(HbaseInterpreter.class);
private ScriptingContainer scriptingContainer;
private Map<String, Executor> runningProcesses = new HashMap<>();

private static final int SIGTERM_CODE = 143;

private StringWriter writer;
private long commandTimeout = 60000;

public HbaseInterpreter(Properties property) {
super(property);
public HbaseInterpreter(Properties properties) {
super(properties);
}

@Override
public void open() throws InterpreterException {
this.scriptingContainer = new ScriptingContainer(LocalContextScope.SINGLETON);
this.writer = new StringWriter();
scriptingContainer.setOutput(this.writer);

if (!Boolean.parseBoolean(getProperty(HBASE_TEST_MODE))) {
String hbaseHome = getProperty(HBASE_HOME);
String rubySrc = getProperty(HBASE_RUBY_SRC);
Path absRubySrc = Paths.get(hbaseHome, rubySrc).toAbsolutePath();

LOGGER.info("Home:" + hbaseHome);
LOGGER.info("Ruby Src:" + rubySrc);

File f = absRubySrc.toFile();
if (!f.exists() || !f.isDirectory()) {
throw new InterpreterException("HBase ruby sources is not available at '" + absRubySrc
+ "'");
}

LOGGER.info("Absolute Ruby Source:" + absRubySrc.toString());
// hirb.rb:41 requires the following system properties to be set.
Properties sysProps = System.getProperties();
sysProps.setProperty(HBASE_RUBY_SRC, absRubySrc.toString());

Path absHirbPath = Paths.get(hbaseHome, "bin/hirb.rb");
try {
FileInputStream fis = new FileInputStream(absHirbPath.toFile());
this.scriptingContainer.runScriptlet(fis, "hirb.rb");
fis.close();
} catch (IOException e) {
throw new InterpreterException(e.getCause());
}
}
// Do nothing
}

@Override
public void close() {
if (this.scriptingContainer != null) {
this.scriptingContainer.terminate();
}
runningProcesses.clear();
runningProcesses = null;
}

@Override
public InterpreterResult interpret(String cmd, InterpreterContext interpreterContext) {
public InterpreterResult interpret(String st, InterpreterContext context) {
LOGGER.debug("Run HBase shell script: {}", st);

if (StringUtils.isEmpty(st)) {
return new InterpreterResult(InterpreterResult.Code.SUCCESS);
}

String paragraphId = context.getParagraphId();
// Write script in a temporary file
// The script is enriched with extensions
final File scriptFile = new File(getScriptFileName(paragraphId));
try {
LOGGER.info(cmd);
this.writer.getBuffer().setLength(0);
this.scriptingContainer.runScriptlet(cmd);
this.writer.flush();
LOGGER.debug(writer.toString());
return new InterpreterResult(InterpreterResult.Code.SUCCESS, writer.getBuffer().toString());
} catch (Throwable t) {
LOGGER.error("Can not run '" + cmd + "'", t);
return new InterpreterResult(InterpreterResult.Code.ERROR, t.getMessage());
FileUtils.write(scriptFile, st + "\nexit");
} catch (IOException e) {
LOGGER.error("Can not write script in temp file", e);
return new InterpreterResult(InterpreterResult.Code.ERROR, e.getMessage());
}

InterpreterResult result = new InterpreterResult(InterpreterResult.Code.SUCCESS);

final DefaultExecutor executor = new DefaultExecutor();
final ByteArrayOutputStream errorStream = new ByteArrayOutputStream();

executor.setStreamHandler(new PumpStreamHandler(context.out, errorStream));
executor.setWatchdog(new ExecuteWatchdog(commandTimeout));

String hbaseCmdPath = Paths.get(getProperty(HBASE_HOME), "bin", "hbase").toString();
final CommandLine cmdLine = CommandLine.parse(hbaseCmdPath);
cmdLine.addArgument("shell", false);
cmdLine.addArgument(scriptFile.getAbsolutePath(), false);

try {
executor.execute(cmdLine);
runningProcesses.put(paragraphId, executor);
} catch (ExecuteException e) {
LOGGER.error("Can not run script in paragraph {}", paragraphId, e);

final int exitValue = e.getExitValue();
InterpreterResult.Code code = InterpreterResult.Code.ERROR;
String msg = errorStream.toString();

if (exitValue == SIGTERM_CODE) {
code = InterpreterResult.Code.INCOMPLETE;
msg = msg + "Paragraph received a SIGTERM.\n";
LOGGER.info("The paragraph {} stopped executing: {}", paragraphId, msg);
}

msg += "ExitValue: " + exitValue;
result = new InterpreterResult(code, msg);
} catch (IOException e) {
LOGGER.error("Can not run script in paragraph {}", paragraphId, e);
result = new InterpreterResult(InterpreterResult.Code.ERROR, e.getMessage());
} finally {
FileUtils.deleteQuietly(scriptFile);
stopProcess(paragraphId);
}
return result;
}

@Override
public void cancel(InterpreterContext context) {}
public void cancel(InterpreterContext context) {
stopProcess(context.getParagraphId());
FileUtils.deleteQuietly(new File(getScriptFileName(context.getParagraphId())));
}

@Override
public FormType getFormType() {
Expand All @@ -143,30 +145,25 @@ public Scheduler getScheduler() {
HbaseInterpreter.class.getName() + this.hashCode());
}

@Override
public List<InterpreterCompletion> completion(String buf, int cursor,
InterpreterContext interpreterContext) {
return null;
private String getScriptFileName(String paragraphId) {
return String.format("%s%s.txt", getScriptDir(), paragraphId);
}

private static String getSystemDefault(
String envName,
String propertyName,
String defaultValue) {

if (envName != null && !envName.isEmpty()) {
String envValue = System.getenv().get(envName);
if (envValue != null) {
return envValue;
}
private String getScriptDir() {
String tmpProperty = System.getProperty("java.io.tmpdir");
if (!tmpProperty.endsWith(File.separator)) {
tmpProperty += File.separator;
}

if (propertyName != null && !propertyName.isEmpty()) {
String propValue = System.getProperty(propertyName);
if (propValue != null) {
return propValue;
}
return tmpProperty + "zeppelin-hbase-scripts" + File.separator;
}

private void stopProcess(String paragraphId) {
if (runningProcesses.containsKey(paragraphId)) {
final Executor executor = runningProcesses.get(paragraphId);
final ExecuteWatchdog watchdog = executor.getWatchdog();
watchdog.destroyProcess();
runningProcesses.remove(paragraphId);
}
return defaultValue;
}
}
12 changes: 0 additions & 12 deletions hbase/src/main/resources/interpreter-setting.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,6 @@
"defaultValue": "/usr/lib/hbase/",
"description": "Installation directory of HBase",
"type": "string"
},
"hbase.ruby.sources": {
"propertyName": "hbase.ruby.sources",
"defaultValue": "lib/ruby",
"description": "Path to Ruby scripts relative to 'hbase.home'",
"type": "string"
},
"zeppelin.hbase.test.mode": {
"propertyName": "zeppelin.hbase.test.mode",
"defaultValue": false,
"description": "Disable checks for unit and manual tests",
"type": "checkbox"
}
},
"editor": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,14 @@

package org.apache.zeppelin.hbase;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;

import java.util.Properties;

import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import java.util.Properties;

import static org.junit.jupiter.api.Assertions.assertNotNull;

/**
* Tests for HBase Interpreter.
*/
Expand All @@ -34,8 +32,6 @@ public class HbaseInterpreterTest {
public static void setUp() throws NullPointerException, InterpreterException {
Properties properties = new Properties();
properties.put("hbase.home", "");
properties.put("hbase.ruby.sources", "");
properties.put("zeppelin.hbase.test.mode", "true");

hbaseInterpreter = new HbaseInterpreter(properties);
hbaseInterpreter.open();
Expand All @@ -45,28 +41,4 @@ public static void setUp() throws NullPointerException, InterpreterException {
void newObject() {
assertNotNull(hbaseInterpreter);
}

@Test
void putsTest() {
InterpreterResult result = hbaseInterpreter.interpret("puts \"Hello World\"", null);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType());
assertEquals("Hello World\n", result.message().get(0).getData());
}

public void putsLoadPath() {
InterpreterResult result = hbaseInterpreter.interpret(
"require 'two_power'; puts twoToThePowerOf(4)", null);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType());
assertEquals("16\n", result.message().get(0).getData());
}

@Test
void testException() {
InterpreterResult result = hbaseInterpreter.interpret("plot practical joke", null);
assertEquals(InterpreterResult.Code.ERROR, result.code());
assertEquals("(NameError) undefined local variable or method `joke' for main:Object",
result.message().get(0).getData());
}
}