`
greemranqq
  • 浏览: 966815 次
  • 性别: Icon_minigender_1
  • 来自: 重庆
社区版块
存档分类
最新评论

hive udf 唯一bigInt 生成器

阅读更多

一、背景

        mysql数据由于自增的bigint 主键,会插入更快,因为能持续往文件末尾插入嘛,因此需要这个东西。

        然后呢,服务端有专门生产id的接口,但是数据中心批量插入,肯定会拉暴他们,不让我们一起玩,只能自己玩。

 

二、方案

        1.redis 获取数据段,程序内部自增。

        问题:要用外部redis麻烦,而且要持久化

 

        2.python 服务注册的,这个自己网上搜索。

        问题:要机器,要服务端和客户端,而且python 我们版本不够高。。。尴尬,不想随便升级整个集群

 

 

        3.twitter 的 snowflake 算法

         参考: https://www.jianshu.com/p/54a87a7c3622

 

         参数:workId  datacenterId 唯一

  

         问题:

          1.两个ID 我都没法唯一。MAP 阶段可能是在同一个机器上,同时执行,参数不好搞

          

          解决方案:

          1.我们场景是只需要同一个任务的主键不重复就行。因此workId 我选取map 的ID,毕竟同一个任务,每个MAP的ID 肯定不同。 datacenterId   可以先默认0.

 

三.上代码:

      

public class MagicSnowFlake {

    //其实时间戳   2017-01-01 00:00:00
    private final static long twepoch = 1483200000000l;

 // 改到16 65535,认为MAP的最大数量限制

    private final static long ipIdMax = 65535;

    // 默认1位,我们小,没那么多数据中心,意思一下
    private final static long dataCenterIdBits = 1L;

  // 9+1 (10 )

    private final static long mapIdBits = 9L;

    private final static long dataCenterIdMax = ~ (-1L << dataCenterIdBits);

    //序列在id中占的位数 12bit
    private final static long seqBits = 12L;

    //序列最大值 4095 即2的12次方减一。
    private final static long seqMax = ~(-1L << seqBits);

    // 64位的数字:首位0  随后41位表示时间戳 MAP_ID 最后12位序列号
    private final static long dataCenterIdLeftShift = seqBits;
    private final static long mapIdLeftShift = seqBits + dataCenterIdBits;
    private final static long timeLeftShift = seqBits  + dataCenterIdBits + mapIdLeftShift;

    //IP标识(0~255)
    private long ipId;

    // 数据中心ID(0~3)
    private long dataCenterId;

    // 毫秒内序列(0~4095)
    private long seq = 0L;

    // 上次生成ID的时间截
    private long lastTime = -1L;

    public MagicSnowFlake(long ipId, long dataCenterId) {
        if(ipId < 0 || ipId > ipIdMax) {
            System.out.println(" ---------- ipId不在正常范围内(0~"+ipIdMax +") " + ipId);
            System.exit(0);
        }

        if(dataCenterId < 0 || dataCenterId > dataCenterIdMax) {
            System.out.println(" ---------- dataCenterId不在正常范围内(0~"+dataCenterIdMax +") " + dataCenterId);
            System.exit(0);
        }

        this.ipId = ipId;
        this.dataCenterId = dataCenterId;
    }

    public synchronized long nextId() {
        long nowTime = System.currentTimeMillis();

        if(nowTime < lastTime) {
            System.out.println(" ---------- 当前时间前于上次操作时间,当前时间有误: " + nowTime);
            System.exit(0);
        }

        if(nowTime == lastTime) {
            seq = (seq + 1) & seqMax;
            if(seq == 0) {
                nowTime = getNextTimeStamp();
            }
        } else {
            seq = 0L;
        }

        lastTime = nowTime;


        return ((nowTime - twepoch) << timeLeftShift)
                | (ipId << mapIdLeftShift)
                | (dataCenterId << dataCenterIdLeftShift)
                | seq;
    }

    private long getNextTimeStamp() {
        long nowTime;
        do {
            nowTime = System.currentTimeMillis();
        } while(nowTime <= lastTime);
        return nowTime;
    }

    public static void main(String[] args) {
        System.out.println(Long.MAX_VALUE);
        MagicSnowFlake msf = new MagicSnowFlake(1, 1);
        msf.nextId();
        System.out.println(~ (-1L << 15));
    }
}

 

       

 

    UDF 部分

   

import org.apache.hadoop.hive.ql.exec.MapredContext;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.UDFType;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;

/**
 * @author <a href="mailto:huoguo@2dfire.com">火锅</a>
 * @time 18/3/8
 */
@UDFType(deterministic = false, stateful = true)
public class LongIdUDF extends GenericUDF {
    private static final char SEPARATOR = '_';
    private static final String ATTEMPT = "attempt";
    private long mapTaskId = 0l;
    private int increment = 0;

    private MagicSnowFlake snowFlake;



    @Override
    public void configure(MapredContext context) {
        increment = context.getJobConf().getNumMapTasks();
        if(increment == 0) {
            throw new IllegalArgumentException("mapred.map.tasks is zero");
        }

        mapTaskId = getInitId(context.getJobConf().get("mapred.task.id"),increment);
        if(mapTaskId == 0l) {
            throw new IllegalArgumentException("mapred.task.id");
        }
    }

    @Override
    public ObjectInspector initialize(ObjectInspector[] arguments)
            throws UDFArgumentException {
        return PrimitiveObjectInspectorFactory.javaLongObjectInspector;
    }

    @Override
    public Long evaluate(DeferredObject[] arguments) throws HiveException {
        if(snowFlake == null){
            int dataCenterId = Integer.parseInt(arguments[0].get().toString());
            snowFlake = new MagicSnowFlake(getMapTaskId(),dataCenterId);
        }
        return snowFlake.nextId();
    }

    @Override
    public String getDisplayString(String[] children) {
        return "getLongId(0)";
    }


    private synchronized long getMapTaskId() {
        return mapTaskId;
    }

    //attempt_1478926768563_0537_m_000004_0 // return 0+1
    private long getInitId (String taskAttemptIDstr,int numTasks)
            throws IllegalArgumentException {
        try {
            String[] parts = taskAttemptIDstr.split(Character.toString(SEPARATOR));
            if(parts.length == 6) {
                if(parts[0].equals(ATTEMPT)) {
                    if(!parts[3].equals("m") && !parts[3].equals("r")) {
                        throw new Exception();
                    }
                    long result = Long.parseLong(parts[4]);
                    if(result >= numTasks) { //if taskid >= numtasks
                        throw new Exception("TaskAttemptId string : " + taskAttemptIDstr  + "  parse ID [" + result + "] >= numTasks[" + numTasks + "] ..");
                    }
                    return result + 1;
                }
            }
        } catch (Exception e) {}
        throw new IllegalArgumentException("TaskAttemptId string : " + taskAttemptIDstr + " is  not properly formed");
    }


    public static void main(String[] args) {
        String s = "attempt_1478926768563_0537_m_000004_4";
        System.out.println(new LongIdUDF().getInitId(s,5));
    }

}

 

 

   小结:

          1.copy的代码自己改造的,忘记位置了,总的来说是出自twitter 嘛。

          2.用了30亿的表进行测试,没重复

          3.有问题 请及时提出,

 

0
0
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics