Spark 1.x Cluster Connectivity

Spark 1.x clusters expose JDBC/ODBC connections via the binary protocol. Therefore, connecting a BI tool to a Spark 1.x cluster differs from making the connection to a Spark 2.x cluster. For Spark 2.x, see Connecting BI Tools.

Step 1: Get the public hostname of the Spark driver

You can run commands in a notebook:

import urllib2
public_hostname = urllib2.urlopen('http://169.254.169.254/latest/meta-data/public-hostname').read()
public_hostname

Or you can discover the Spark driver IP address of a running Spark cluster from the Spark UI:

Screenshot Of Spark UI

Step 2: Open a port for the IP address that the BI tool is connecting from

Get the IP address that BI tool is connecting from.

For Looker, for example, all potential source IP addresses are on this page. As of September 2015, they are:

United States (default)

  • 54.208.10.167
  • 54.209.116.191
  • 52.1.5.228
  • 52.1.157.156

Asia

  • 52.68.85.40
  • 52.68.108.109

Europe

  • 52.16.163.151
  • 52.16.174.170

These IP addresses are included in the configuration samples, below. If you are hosting Looker on premise, you must determine the source IP address and modify the IP_ADDRESSES_TO_ADD variable in these examples to specify that IP address.

# Set up the access keys and configuration variables
ACCESS_KEY = "XXXXXXXXXXXX"
SECRET_KEY = "XXXXXXXXXXXXXXXXXXXX" # shouldn't have to encode or escape
REGION = "us-west-1"  # "us-west-1", not "us-west-1c"
IP_ADDRESSES_TO_ADD = ["54.208.10.167/32", "54.209.116.191/32", "52.1.5.228/32", "52.1.157.156/32", "52.68.85.40/32", "52.68.108.109/32", "52.16.163.151/32", "52.16.174.170/32"]
PORTS_TO_ADD = ["10000"]
PROTOCOLS_TO_ADD = ["tcp"]
# Find current security group
import boto.ec2

# this will only find the first matching security group
def get_databricks_security_group():
  conn = boto.ec2.connect_to_region(REGION,
                                    aws_access_key_id=ACCESS_KEY,
                                    aws_secret_access_key=SECRET_KEY)
  rs = conn.get_all_security_groups()
  for r in rs:
    if (r.name.find('dbc') == 0 or r.name.find('databricks') == 0) and (r.name.find('-ExternalServices') > 0 or r.name.find('-worker-unmanaged')):
      return r

databricks_security_group = get_databricks_security_group()
print "Found Security Group: " + databricks_security_group.name
# Update the security group
from collections import defaultdict

def sg_rule_dict(security_group):
  rule_dict = defaultdict(list)
  rules = map(lambda x: ("%s:%s-%s" %(x.ip_protocol, x.from_port, x.to_port), x.grants), security_group.rules)
  for rule in rules:
    for grant in rule[1]:
      rule_dict[rule[0]].append(grant.cidr_ip)
  return rule_dict

existing_rules = sg_rule_dict(databricks_security_group)

for ip_address in IP_ADDRESSES_TO_ADD:
  for port in PORTS_TO_ADD:
     for protocol in PROTOCOLS_TO_ADD:
       key = "%s:%s-%s" % (protocol, port, port)
       if ip_address not in existing_rules[key]:
         databricks_security_group.authorize(ip_protocol=protocol,
                                             from_port=port,
                                             to_port=port,
                                             cidr_ip=ip_address)
# Verify updated security groups
updated_databricks_security_group = get_databricks_security_group()

# Given a rule, check that the port and protocol matches
def check_port_and_proto(rule, port, protocol):
  if (port == rule.to_port == rule.from_port) and proto == rule.ip_protocol:
    return True
  return False

# Loop over ip address to verify a matching rule in updated security groups
for ip in IP_ADDRESSES_TO_ADD:
  found_ip = False
  list_of_ips = []
  for rule in updated_databricks_security_group.rules:
    # Flag to determine if we find a match for the current ip address
    ip_matches = False
    # rule.grants contains a list of ip address objects. convert to string to check inclusion easily
    for i in rule.grants:
      list_of_ips.append(str(i))
    if ip in list_of_ips:
      ip_matches = True
      for port in PORTS_TO_ADD:
        for proto in PROTOCOLS_TO_ADD:
          if not check_port_and_proto(rule, port, proto):
            print "Missing rule. Ip: " + ip + "\tPort: " + port + "\tProtocol: " + proto
          else:
            print "Matching rule. Ip: " + ip + "\tPort: " + port + "\tProtocol: " + proto
  if not ip_matches:
    print "Cannot find rule with ip: " + ip

Alternatively, you can modify the security group rules that host the Spark instances by using the AWS console. You must modify the security group “account-xxx-Worker”, where “xxx” is a string specific to your deployment. Alternatively, you can use Whitelist IP Addresses to open up access to this port.

Once you have the driver’s hostname, you can run the following netcat test from a terminal to check for connectivity to port 10000.

nc -vz hostname.sparkcluster.ec2.amazon.com 10000

Step 3: Connect BI tools

The Thrift Server runs on port 10000 on the Spark driver. In order to connect using JDBC/ODBC to a Spark 1.x cluster, you must open up the firewall for port 10000 and discover the public IP or DNS of your Spark driver using the steps described above.

The JDBC connection string looks like this:

jdbc:hive2://${Driver Public DNS}:10000/default;ssl=false;transportMode=binary;httpPath=cliservice