Python手册(13) Hive/HBase/HDFS

1. Hive

1.1. 基本信息

  • 参考资料:dropbox/PyHive
  • 使用的是 hiveserver2 服务,默认端口是10000。
  • Linux下安装:
    • conda install thrift sasl pyhive
    • PS:直接用pip好像不太行,不能安装sasl。
  • Windows下安装:

1.2. 基本使用

  • 可以通过 DB-APISqlAlchemy 来操作 Hive。

  • 其基本使用其实就是查看 DB-API 或 SqlAlchemy。

  • DB-API 实例

    1
    2
    3
    4
    5
    6
    7
    from pyhive import hive
    conn = hive.Connection(host='10.8.13.120', port=10000, username='hdfs', database='default')
    cursor = conn.cursor()
    cursor.execute('show tables')

    for result in cursor.fetchall():
    print(result)
  • SqlAlchemy 实例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    from sqlalchemy import *
    from sqlalchemy.engine import create_engine
    from sqlalchemy.schema import *
    # Presto
    engine = create_engine('presto://localhost:8080/hive/default')
    # Hive
    engine = create_engine('hive://localhost:10000/default')
    logs = Table('my_awesome_data', MetaData(bind=engine), autoload=True)
    print(select([func.count('*')], from_obj=logs).scalar())

2. HBase

2.1. 基本信息

  • 参考资料

  • 进入响应环境后,安装happybasethrift

    1
    2
    pip install happybase
    pip install thrift
  • 错误处理:

    • 出现的错误:thriftpy.parser.exc.ThriftParserError: ThriftPy does not support generating module with path in protocol 'd'
    • Windows中才会出现此问题。
    • 参考此文处理
    • 解决方案:修改Lib\site-packages\thriftpy\parser\parser.py文件中的代码:
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      # 修改前
      url_scheme = urlparse(path).scheme
      if url_scheme == '':
      with open(path) as fh:
      data = fh.read()
      elif url_scheme in ('http', 'https'):
      data = urlopen(path).read()
      else:
      raise ThriftParserError('ThriftPy does not support generating module '
      'with path in protocol \'{}\''.format(
      url_scheme))

      # 修改后
      url_scheme = urlparse(path).scheme
      if url_scheme == '':
      with open(path) as fh:
      data = fh.read()
      elif url_scheme in ('c', 'd','e','f'): # 代表c盘、d盘、e盘、f盘等
      with open(path) as fh:
      data = fh.read()
      elif url_scheme in ('http', 'https'):
      data = urlopen(path).read()
      else:
      raise ThriftParserError('ThriftPy does not support generating module '
      'with path in protocol \'{}\''.format(
      url_scheme))

2.2. 基本使用

  • 建立连接

    1
    2
    import happybase
    connection = happybase.Connection(HOST_IP)
  • 显示可用表刚

    1
    print(connection.tables())
  • 创建表

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    # DOCS: http://happybase.readthedocs.io/en/latest/api.html#happybase.Connection.create_table
    # create_table(name, families)
    # name (str) – The table name
    # families (dict) – The name and options for each column family
    families = {
    'cf1': dict(max_versions=10),
    'cf2': dict(max_versions=1, block_cache_enabled=False),
    'cf3': dict(), # use defaults
    }
    connection.create_table('mytable', families)
  • 获取表、行对象

    1
    2
    3
    4
    5
    6
    # 不需要进行编码
    table = connection.table('table_name')

    # 需要进行编码
    # 取得的数据结构是字典,形如 {b'cf:col1': b'value1'}
    row = table.row(b'row_key')
  • 2.5. 基本操作

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    # 获取数据,需要编码
    print(row[b'cf1:col1'])

    # 存储数据,需要编码
    # DOCS: http://happybase.readthedocs.io/en/latest/api.html#happybase.Table.put
    table.put(b'row-key', {b'cf:col1': b'value1', b'cf:col2': b'value2'}, timestamp=123456789)
    table.put(b'row-key', {b'cf:col1': b'value1'})

    # 删除数据,需要编码
    table.delete(b'row-key')
    table.delete(b'row-key', columns=[b'cf1:col1', b'cf1:col2'])
  • 2.6. 批量操作

    1
    2
    3
    4
    5
    6
    7
    # DOCS: http://happybase.readthedocs.io/en/latest/api.html#batch
    b = table.batch()
    b.put(b'row-key-1', {b'cf:col1': b'value1', b'cf:col2': b'value2'})
    b.put(b'row-key-2', {b'cf:col2': b'value2', b'cf:col3': b'value3'})
    b.put(b'row-key-3', {b'cf:col3': b'value3', b'cf:col4': b'value4'})
    b.delete(b'row-key-4')
    b.send()
  • 2.7 连接池

    1
    2
    3
    4
    5
    6
    7
    8
    9
    # DOCS: http://happybase.readthedocs.io/en/latest/api.html#connection-pool
    pool = happybase.ConnectionPool(size=3, host='...')
    # 应尽快使用connection对象,不应在with中处理数据
    # 在with中获取数据,在with外处理数据
    with pool.connection() as connection:
    table = connection.table('table-name')
    row = table.row(b'row-key')

    process_data(row)

3. HDFS

3.1. 基本信息

3.1. 基本使用

  • 创建 client 对象

    1
    2
    from hdfs.client import Client
    client = Client("http://hdfs:50070/", root="/")
  • 其他基本操作

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    # 创建目录
    client.makedirs("/test",permission=777)

    # 查看指定目录下文件列表
    # status:为True时,也返回子目录的状态信息,默认为Flase
    client.list(hdfs_path, status=False)

    # 重命名/移动文件
    client.rename(hdfs_src_path, hdfs_dst_path)

    # 写入
    # 追加/覆盖文件,主要看 overwrite 选项
    client.write(hdfs_path, data, overwrite=True, append=False)

    # 从hdfs下载文件到本地
    client.download(hdfs_path, local_path, overwrite=False)

    # 从本地上传文件到hdfs
    client.upload(hdfs_path, local_path, cleanup=True)

    # 删除hdfs中文件
    client.delete(hdfs_path)

    # 读取文件
    with client.read('foo') as reader:
    content = reader.read()