Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add basic authentication plugin #1087

Merged
merged 8 commits into from
Feb 2, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,7 @@ flexible messaging model and an intuitive client API.</description>
<exclude>logs/**</exclude>
<exclude>**/*.versionsBackup</exclude>
<exclude>**/circe/**</exclude>
<exclude>pulsar-broker/src/test/resources/authentication/basic/.htpasswd</exclude>
<exclude>pulsar-client-cpp/lib/checksum/int_types.h</exclude>
<exclude>pulsar-client-cpp/lib/checksum/gf2.hpp</exclude>
<exclude>pulsar-client-cpp/lib/checksum/crc32c_sse42.cc</exclude>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.pulsar.broker.authentication;

import org.apache.commons.codec.digest.Crypt;
import org.apache.commons.codec.digest.Md5Crypt;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.ServiceConfiguration;

import javax.naming.AuthenticationException;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.*;

public class AuthenticationProviderBasic implements AuthenticationProvider {
private final static String HTTP_HEADER_NAME = "Authorization";
private final static String CONF_SYSTEM_PROPERTY_KEY = "pulsar.auth.basic.conf";
private Map<String, String> users;

@Override
public void close() throws IOException {
// noop
}

@Override
public void initialize(ServiceConfiguration config) throws IOException {
File confFile = new File(System.getProperty(CONF_SYSTEM_PROPERTY_KEY));
if (!confFile.exists()) {
throw new IOException("The password auth conf file does not exist");
} else if (!confFile.isFile()) {
throw new IOException("The path is not a file");
}
BufferedReader reader = new BufferedReader(new FileReader(confFile));
users = new HashMap<>();
for (String line : reader.lines().toArray(s -> new String[s])) {
List<String> splitLine = Arrays.asList(line.split(":"));
if (splitLine.size() != 2) {
throw new IOException("The format of the password auth conf file is invalid");
}
users.put(splitLine.get(0), splitLine.get(1));
}
reader.close();
}

@Override
public String getAuthMethodName() {
return "basic";
}

@Override
public String authenticate(AuthenticationDataSource authData) throws AuthenticationException {
AuthParams authParams = new AuthParams(authData);
String userId = authParams.getUserId();
String password = authParams.getPassword();
String msg = "Unknown user or invalid password";

if (users.get(userId) == null) {
throw new AuthenticationException(msg);
}

String encryptedPassword = users.get(userId);

// For md5 algorithm
if ((users.get(userId).startsWith("$apr1"))) {
List<String> splitEncryptedPassword = Arrays.asList(encryptedPassword.split("\\$"));
if (splitEncryptedPassword.size() != 4 || !encryptedPassword
.equals(Md5Crypt.apr1Crypt(password.getBytes(), splitEncryptedPassword.get(2)))) {
throw new AuthenticationException(msg);
}
// For crypt algorithm
} else if (!encryptedPassword.equals(Crypt.crypt(password.getBytes(), encryptedPassword.substring(0, 2)))) {
throw new AuthenticationException(msg);
}

return userId;
}

private class AuthParams {
private String userId;
private String password;

public AuthParams(AuthenticationDataSource authData) throws AuthenticationException {
String authParams;
if (authData.hasDataFromCommand()) {
authParams = authData.getCommandData();
} else if (authData.hasDataFromHttp()) {
String rawAuthToken = authData.getHttpHeader(HTTP_HEADER_NAME);
// parsing and validation
if (StringUtils.isBlank(rawAuthToken) || !rawAuthToken.toUpperCase().startsWith("BASIC ")) {
throw new AuthenticationException("Authentication token has to be started with \"Basic \"");
}
String[] splitRawAuthToken = rawAuthToken.split(" ");
if (splitRawAuthToken.length != 2) {
throw new AuthenticationException("Base64 encoded token is not found");
}

try {
authParams = new String(Base64.getDecoder().decode(splitRawAuthToken[1]));
} catch (Exception e) {
throw new AuthenticationException("Base64 decoding is failure: " + e.getMessage());
}
} else {
throw new AuthenticationException("Authentication data source does not have data");
}

String[] parsedAuthParams = authParams.split(":");
if (parsedAuthParams.length != 2) {
throw new AuthenticationException("Base64 decoded params are invalid");
}

userId = parsedAuthParams[0];
password = parsedAuthParams[1];
}

public String getUserId() {
return userId;
}

public String getPassword() {
return password;
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing new line

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

Original file line number Diff line number Diff line change
Expand Up @@ -18,32 +18,17 @@
*/
package org.apache.pulsar.client.api;

import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;

import java.net.URI;
import java.util.*;
import java.util.concurrent.TimeUnit;

import javax.ws.rs.InternalServerErrorException;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.pulsar.broker.authentication.AuthenticationProviderBasic;
import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerConfiguration;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConfiguration;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.auth.AuthenticationBasic;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.PropertyAdmin;
import org.apache.pulsar.common.policies.data.SubscriptionAuthMode;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
Expand All @@ -52,8 +37,13 @@
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import javax.ws.rs.InternalServerErrorException;
import java.net.URI;
import java.util.*;
import java.util.concurrent.TimeUnit;

import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;

public class AuthenticatedProducerConsumerTest extends ProducerConsumerBase {
private static final Logger log = LoggerFactory.getLogger(AuthenticatedProducerConsumerTest.class);
Expand All @@ -64,6 +54,8 @@ public class AuthenticatedProducerConsumerTest extends ProducerConsumerBase {
private final String TLS_CLIENT_CERT_FILE_PATH = "./src/test/resources/authentication/tls/client-cert.pem";
private final String TLS_CLIENT_KEY_FILE_PATH = "./src/test/resources/authentication/tls/client-key.pem";

private final String BASIC_CONF_FILE_PATH = "./src/test/resources/authentication/basic/.htpasswd";

@BeforeMethod
@Override
protected void setup() throws Exception {
Expand All @@ -83,6 +75,7 @@ protected void setup() throws Exception {
Set<String> superUserRoles = new HashSet<>();
superUserRoles.add("localhost");
superUserRoles.add("superUser");
superUserRoles.add("superUser2");
conf.setSuperUserRoles(superUserRoles);

conf.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName());
Expand All @@ -91,6 +84,8 @@ protected void setup() throws Exception {

Set<String> providers = new HashSet<>();
providers.add(AuthenticationProviderTls.class.getName());
providers.add(AuthenticationProviderBasic.class.getName());
System.setProperty("pulsar.auth.basic.conf", BASIC_CONF_FILE_PATH);
conf.setAuthenticationProviders(providers);

conf.setClusterName("use");
Expand All @@ -107,7 +102,13 @@ protected final void internalSetup(Authentication auth) throws Exception {
clientConf.setUseTls(true);

admin = spy(new PulsarAdmin(brokerUrlTls, clientConf));
String lookupUrl = new URI("pulsar+ssl://localhost:" + BROKER_PORT_TLS).toString();
String lookupUrl;
// For http basic authentication test
if (methodName.equals("testBasicCryptSyncProducerAndConsumer")) {
lookupUrl = new URI("https://localhost:" + BROKER_WEBSERVICE_PORT_TLS).toString();
} else {
lookupUrl = new URI("pulsar+ssl://localhost:" + BROKER_PORT_TLS).toString();
}
pulsarClient = PulsarClient.create(lookupUrl, clientConf);
}

Expand Down Expand Up @@ -167,8 +168,6 @@ public void testTlsSyncProducerAndConsumer(int batchMessageDelayMs) throws Excep
authTls.configure(authParams);
internalSetup(authTls);

admin.clusters().updateCluster("use", new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(),
"pulsar://localhost:" + BROKER_PORT, "pulsar+ssl://localhost:" + BROKER_PORT_TLS));
admin.properties().createProperty("my-property",
new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("use")));
admin.namespaces().createNamespace("my-property/use/my-ns");
Expand All @@ -178,6 +177,39 @@ public void testTlsSyncProducerAndConsumer(int batchMessageDelayMs) throws Excep
log.info("-- Exiting {} test --", methodName);
}

@Test(dataProvider = "batch")
public void testBasicCryptSyncProducerAndConsumer(int batchMessageDelayMs) throws Exception {
log.info("-- Starting {} test --", methodName);
AuthenticationBasic authPassword = new AuthenticationBasic();
authPassword.configure("{\"userId\":\"superUser\",\"password\":\"supepass\"}");
internalSetup(authPassword);

admin.properties()
.createProperty("my-property", new PropertyAdmin(Lists.newArrayList(), Sets.newHashSet("use")));
admin.namespaces().createNamespace("my-property/use/my-ns");

testSyncProducerAndConsumer(batchMessageDelayMs);

log.info("-- Exiting {} test --", methodName);
}

@Test(dataProvider = "batch")
public void testBasicArp1SyncProducerAndConsumer(int batchMessageDelayMs) throws Exception {
log.info("-- Starting {} test --", methodName);
AuthenticationBasic authPassword = new AuthenticationBasic();
authPassword.configure("{\"userId\":\"superUser2\",\"password\":\"superpassword\"}");
internalSetup(authPassword);

admin.properties()
.createProperty("my-property", new PropertyAdmin(Lists.newArrayList(), Sets.newHashSet("use")));
admin.namespaces().createNamespace("my-property/use/my-ns");

testSyncProducerAndConsumer(batchMessageDelayMs);

log.info("-- Exiting {} test --", methodName);
}


@Test(dataProvider = "batch")
public void testAnonymousSyncProducerAndConsumer(int batchMessageDelayMs) throws Exception {
log.info("-- Starting {} test --", methodName);
Expand Down Expand Up @@ -223,7 +255,7 @@ public void testAnonymousSyncProducerAndConsumer(int batchMessageDelayMs) throws

/**
* Verifies: on 500 server error, broker invalidates session and client receives 500 correctly.
*
*
* @throws Exception
*/
@Test
Expand Down Expand Up @@ -254,7 +286,7 @@ public void testAuthenticationFilterNegative() throws Exception {
/**
* verifies that topicLookup/PartitionMetadataLookup gives InternalServerError(500) instead 401(auth_failed) on
* unknown-exception failure
*
*
* @throws Exception
*/
@Test
Expand Down Expand Up @@ -292,4 +324,4 @@ public void testInternalServerExceptionOnLookup() throws Exception {

}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
superUser:mQQQIsyvvKRtU
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure but should we add this file under license exclude list ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added it.

superUser2:$apr1$foobarmq$kuSZlLgOITksCkRgl57ie/
6 changes: 6 additions & 0 deletions pulsar-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@
<groupId>org.bouncycastle</groupId>
<artifactId>bcpkix-jdk15on</artifactId>
</dependency>

<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>

</dependencies>

<build>
Expand Down
Loading